patch ob_admin

This commit is contained in:
simonjoylet 2021-11-09 17:33:42 +08:00 committed by LINxiansheng
parent 04ecb035fe
commit b5266f04ea
24 changed files with 5825 additions and 9 deletions

View File

@ -1,18 +1,22 @@
add_executable(ob_admin
clog_tool/cmd_args_parser.h
clog_tool/ob_admin_clog_v2_executor.cpp
clog_tool/ob_admin_clog_v2_executor.h
clog_tool/ob_func_utils.cpp
clog_tool/ob_func_utils.h
clog_tool/ob_ilog_entry_parser.cpp
clog_tool/ob_ilog_entry_parser.h
clog_tool/ob_log_entry_filter.cpp
clog_tool/ob_log_entry_filter.h
clog_tool/ob_log_entry_parser.cpp
clog_tool/ob_log_entry_parser.h
usec_tool/ob_admin_usec_executor.cpp
usec_tool/ob_admin_usec_executor.h
ob_admin_executor.h
slog_tool/ob_admin_slog_executor.cpp
dumpsst/ob_admin_dumpsst_executor.cpp
dumpsst/ob_admin_cmp_micro_executor.cpp
dumpsst/ob_admin_dumpsst_utils.cpp
dumpsst/ob_admin_dumpsst_print_helper.cpp
archive_tool/ob_fake_archive_log_file_store.cpp
archive_tool/ob_admin_log_archive_executor.cpp
archive_tool/ob_archive_entry_parser.cpp
archive_tool/ob_archive_fake_entry_iterator.cpp
archive_tool/ob_archive_fake_file_store.cpp
backup_tool/ob_admin_dump_backup_data_executor.cpp
ob_admin_executor.cpp
main.cpp)
@ -32,4 +36,4 @@ target_link_libraries(ob_admin
target_include_directories(ob_admin
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR})
${CMAKE_CURRENT_SOURCE_DIR})

View File

@ -0,0 +1,245 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#define USING_LOG_PREFIX CLOG
#include <unistd.h>
#include <getopt.h>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/stat.h>
#include "ob_admin_log_archive_executor.h"
#include "../clog_tool/ob_func_utils.h"
#include "../clog_tool/ob_log_entry_filter.h"
#include "../clog_tool/ob_log_entry_parser.h"
#include "../clog_tool/cmd_args_parser.h"
#include "ob_fake_archive_log_file_store.h"
#include "archive/ob_archive_entry_iterator.h"
#include "clog/ob_log_entry.h"
#include "share/ob_version.h"
#include "archive/ob_log_archive_struct.h"
#include "ob_archive_entry_parser.h"
using namespace oceanbase::common;
using namespace oceanbase::clog;
using namespace oceanbase::archive;
namespace oceanbase
{
namespace tools
{
#define getcfg(key) getenv(key)
int ObAdminLogArchiveExecutor::execute(int argc, char *argv[])
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(argc < 4) || OB_ISNULL(argv)) {
print_usage();
ret = OB_INVALID_ARGUMENT;
} else if (OB_FAIL(parse_options(argc, argv))) {
LOG_WARN("failed to parse options", K(ret));
} else {
int new_argc = argc - optind;
char **new_argv = argv + optind;
if (OB_NEED_RETRY != (ret = CmdCallSimple(new_argc, new_argv, dump_log):OB_NEED_RETRY)) {
LOG_INFO("finish dump_log ", K(ret));
} else if (OB_NEED_RETRY != (ret = CmdCallSimple(new_argc, new_argv, dump_index):OB_NEED_RETRY)) {
LOG_INFO("finish dump_index ", K(ret));
} else if (OB_NEED_RETRY != (ret = CmdCallSimple(new_argc, new_argv, dump_key):OB_NEED_RETRY)) {
LOG_INFO("finish dump_key ", K(ret));
} else {
fprintf(stderr, "failed %d", ret);
print_usage();
}
}
return ret;
}
void ObAdminLogArchiveExecutor::print_usage()
{
fprintf(stdout,
"Usages:\n"
"ob_admin archive_tool dump_log data_files ## ./ob_admin archive_tool dump_log 1 2 3\n"
"ob_admin archive_tool dump_index index_files ## ./ob_admin archive_tool dump_index 1 2 3\n"
"ob_admin archive_tool dump_key archive_key_files ## ./ob_admin archive_tool dump_key 1100611139403779_0\n"
);
}
int ObAdminLogArchiveExecutor::dump_log(int argc, char *argv[])
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(argc < 1) || OB_ISNULL(argv)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arguments", K(argc), K(ret));
} else {
LOG_INFO("begin to dump all archive log file", K(ret));
for (int64_t i = 0; i < argc; ++i) {
if (OB_FAIL(dump_single_file(argv[i])) && OB_ITER_END != ret) {
LOG_WARN("failed to dump log ", K(argv[i]), K(ret));
} else if (OB_ITER_END == ret) {
ret = OB_SUCCESS;
} else {/*do nothing*/}
}
if (OB_FAIL(ret)) {
print_usage();
}
}
return ret;
}
int ObAdminLogArchiveExecutor::dump_index(int argc, char *argv[])
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(argc < 1) || OB_ISNULL(argv)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arguments", K(argc), K(ret));
} else {
LOG_INFO("begin to dump archive index file", K(ret));
for (int64_t i = 0; i < argc; ++i) {
if (OB_FAIL(dump_single_index_file(argv[i])) && OB_ITER_END != ret) {
LOG_WARN("failed to dump log ", K(argv[i]), K(ret));
} else if (OB_ITER_END == ret) {
ret = OB_SUCCESS;
}
}
}
return ret;
}
int ObAdminLogArchiveExecutor::dump_key(int argc, char *argv[])
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(argc < 1) || OB_ISNULL(argv)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arguments", K(argc), K(ret));
} else {
LOG_INFO("begin to dump archive index file", K(ret));
for (int64_t i = 0; i < argc; ++i) {
if (OB_FAIL(dump_single_key_file(argv[i])) && OB_ITER_END != ret) {
LOG_WARN("failed to dump log ", K(argv[i]), K(ret));
} else if (OB_ITER_END == ret) {
ret = OB_SUCCESS;
}
}
}
return ret;
}
int ObAdminLogArchiveExecutor::dump_single_file(const char *path)
{
int ret = OB_SUCCESS;
uint64_t file_id = 0;
ObArchiveEntryParser entry_parser;
if (OB_FAIL(file_name_parser(path, file_id))) {
LOG_WARN("failed to parse file name", K(path), K(ret));
} else if (OB_FAIL(entry_parser.init(file_id, tenant_id_, DB_host_, DB_port_))) {
LOG_WARN("failed to init entry parser", K(path), K(ret));
} else if (OB_FAIL(entry_parser.dump_all_entry(path))) {
if (OB_ITER_END == ret) {
LOG_INFO("succ to dump_all_entry", K(path));
} else {
LOG_WARN("failed to dump_all_entry", K(path), K(ret));
}
} else {/*do nothing*/}
return ret;
}
int ObAdminLogArchiveExecutor::dump_single_index_file(const char *path)
{
int ret = OB_SUCCESS;
int fd = -1;
char *buf = NULL;
int64_t buf_len = -1;
uint64_t file_id = -1;
if (OB_FAIL(mmap_log_file(path, buf, buf_len, fd))) {
LOG_WARN("failed to mmap_log_file", K(path), K(ret));
} else if (OB_FAIL(file_name_parser(path, file_id))) {
LOG_WARN("failed to parse file name", K(path), K(ret));
} else {
ObArchiveIndexFileInfo info;
int64_t pos = 0;
while (OB_SUCCESS == ret && pos < buf_len) {
info.reset();
if (OB_FAIL(info.deserialize(buf, buf_len, pos))) {
LOG_WARN("deserialize info fail", K(ret), K(buf), K(buf_len));
} else {
fprintf(stdout, "$$$ %s\n", to_cstring(info));
}
}
}
if (fd >= 0) {
close(fd);
}
return ret;
}
int ObAdminLogArchiveExecutor::dump_single_key_file(const char *path)
{
int ret = OB_SUCCESS;
int fd = -1;
char *buf = NULL;
int64_t buf_len = -1;
if (OB_FAIL(mmap_log_file(path, buf, buf_len, fd))) {
LOG_WARN("failed to mmap_log_file", K(path), K(ret));
} else {
ObArchiveKeyContent key_content;
int64_t pos = 0;
if (OB_FAIL(key_content.deserialize(buf, buf_len, pos))) {
LOG_WARN("deserialize info fail", K(ret), K(buf), K(buf_len));
} else {
fprintf(stdout, "$$$ %s, %s\n", path, to_cstring(key_content));
}
}
if (fd >= 0) {
close(fd);
}
return ret;
}
int ObAdminLogArchiveExecutor::mmap_log_file(const char *path, char *&buf_out, int64_t &buf_len, int &fd)
{
int ret = OB_SUCCESS;
void *buf = NULL;
struct stat stat_buf;
const int64_t FILE_MAX_SIZE = 64 * 1024 * 1024;
if (OB_ISNULL(path)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid log file path is NULL", K(ret));
} else if (-1 == (fd = open(path, O_RDONLY))) {
ret = OB_IO_ERROR;
CLOG_LOG(ERROR, "open file fail", K(path), KERRMSG, K(ret));
} else if (-1 == fstat(fd, &stat_buf)) {
ret = OB_IO_ERROR;
CLOG_LOG(ERROR, "stat_buf error", K(path), KERRMSG, K(ret));
} else if (stat_buf.st_size > FILE_MAX_SIZE) {
ret = OB_INVALID_ARGUMENT;
CLOG_LOG(ERROR, "invalid file size", K(path), K(stat_buf.st_size), K(ret));
} else if (MAP_FAILED == (buf = mmap(NULL, stat_buf.st_size, PROT_READ, MAP_SHARED, fd, 0)) || NULL == buf) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("failed to mmap file", K(path), K(errno), KERRMSG, K(ret));
} else {
buf_out = static_cast<char *>(buf);
buf_len = stat_buf.st_size;
}
return ret;
}
}//namespace tools
}//namespace oceanbase

View File

@ -0,0 +1,45 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef OB_ADMIN_LOG_ARCHIVE_EXECUTOR_H_
#define OB_ADMIN_LOG_ARCHIVE_EXECUTOR_H_
#include "../ob_admin_executor.h"
#include "../clog_tool/ob_log_entry_filter.h"
namespace oceanbase
{
using namespace clog;
namespace tools
{
class ObAdminLogArchiveExecutor : public ObAdminExecutor
{
public:
ObAdminLogArchiveExecutor(){}
virtual ~ObAdminLogArchiveExecutor(){}
virtual int execute(int argc, char *argv[]);
int dump_single_file(const char *path);
int dump_single_index_file(const char *path);
int dump_single_key_file(const char *path);
private:
int dump_log(int argc, char *argv[]);
int dump_index(int argc, char *argv[]);
int dump_key(int argc, char *argv[]);
void print_usage();
int mmap_log_file(const char *path, char *&buf_out, int64_t &buf_len, int &fd);
};
}
}
#endif /* OB_ADMIN_LOG_ARCHIVE_V2_H_ */

View File

@ -0,0 +1,91 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#include "ob_archive_entry_parser.h"
#include "archive/ob_log_archive_define.h"
#include "archive/ob_archive_entry_iterator.h"
#include "archive/ob_archive_block.h"
#include "ob_archive_fake_entry_iterator.h"
#include "ob_archive_fake_file_store.h"
#include <string.h>
namespace oceanbase
{
using namespace common;
using namespace memtable;
using namespace storage;
using namespace archive;
namespace archive
{
int ObArchiveEntryParser::init(uint64_t file_id,
const uint64_t tenant_id,
const common::ObString &host,
const int32_t port)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(is_inited_)) {
ret = OB_INIT_TWICE;
} else if (OB_UNLIKELY(OB_INVALID_FILE_ID == file_id)) {
ret = OB_INVALID_ARGUMENT;
CLOG_LOG(WARN, "invalid buf or buf_len", K(file_id), K(ret));
} else {
file_id_ = file_id;
tenant_id_ = tenant_id;
if (!host.empty() && port > 0) {
if (OB_SUCCESS != client_.init()) {
CLOG_LOG(WARN, "failed to init net client", K(ret));
} else if (OB_SUCCESS != client_.get_proxy(rpc_proxy_)) {
CLOG_LOG(WARN, "failed to get_proxy", K(ret));
}
host_addr_.set_ip_addr(host, port);
rpc_proxy_.set_server(host_addr_);
}
is_inited_ = true;
}
return ret;
}
int ObArchiveEntryParser::dump_all_entry(const char *path)
{
int ret = OB_SUCCESS;
ObArchiveFakeFileStore file_store;
ObArchiveFakeEntryIterator iter;
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
CLOG_LOG(ERROR, "log entry parser is not inited", K(is_inited_), K(ret));
} else if (OB_FAIL(file_store.init_for_dump(path))) {
CLOG_LOG(WARN, "init for dump fail", K(ret), K(path));
} else if (OB_FAIL(iter.init(tenant_id_, &file_store, &rpc_proxy_))) {
CLOG_LOG(WARN, "iter init fail", K(ret));
} else {
while (OB_SUCC(ret)) {
clog::ObLogEntry entry;
const int64_t fake_pos = 1;
bool unused_flag = false;
int64_t unused_checksum = -1;
if (OB_FAIL(iter.next_entry(entry, unused_flag, unused_checksum))) {
CLOG_LOG(WARN, "next entry fail", K(ret));
} else if (OB_FAIL(dump_clog_entry(entry, fake_pos))) {
CLOG_LOG(WARN, "dump clog entry fail", K(ret));
}
}
if (OB_ITER_END == ret) {
ret = OB_SUCCESS;
}
}
return ret;
}
}//end of namespace clog
}//end of namespace oceanbase

View File

@ -0,0 +1,60 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef OCEANBASE_ARCHIVE_OB_ARCHIVE_ENTRY_PARSER_
#define OCEANBASE_ARCHIVE_OB_ARCHIVE_ENTRY_PARSER_
#include "../clog_tool/ob_log_entry_parser.h"
#include "../clog_tool/ob_func_utils.h"
#include "archive/ob_archive_entry_iterator.h"
namespace oceanbase
{
namespace archive
{
class ObArchiveEntryParser : public clog::ObLogEntryParserImpl
{
public:
ObArchiveEntryParser() : is_inited_(false),
file_id_(0),
tenant_id_(0) {}
virtual ~ObArchiveEntryParser() {}
int init(uint64_t file_id, const uint64_t tenant_id, const common::ObString &host,
const int32_t port);
bool is_inited() const {return is_inited_;}
int dump_all_entry_1();
int dump_all_entry(const char *path);
TO_STRING_KV(K(is_inited_),
K(file_id_),
K(tenant_id_));
protected:
int parse_next_entry_();
int get_entry_type_(ObArchiveItemType &item_type);
void advance_(const int64_t step);
void skip_block_offset_tail_();
protected:
static const int64_t MAGIC_NUM_LEN = 2L;
static const int64_t PRINT_BUF_SIZE = 5 * 1024 * 1024;
bool is_inited_;
uint64_t file_id_;
uint64_t tenant_id_;
DISALLOW_COPY_AND_ASSIGN(ObArchiveEntryParser);
};
}//end of namespace clog
}//end of namespace oceanbase
#endif //OCEANBASE_ARCHIVE_OB_ARCHIVE_ENTRY_PARSER_

View File

@ -0,0 +1,48 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#include "ob_archive_fake_entry_iterator.h"
namespace oceanbase
{
using namespace clog;
namespace archive
{
ObArchiveFakeEntryIterator::~ObArchiveFakeEntryIterator()
{
inited_ = false;
rpc_proxy_ = nullptr;
}
int ObArchiveFakeEntryIterator::init(const uint64_t tenant_id,
ObIArchiveLogFileStore *file_store,
obrpc::ObSrvRpcProxy *rpc_proxy)
{
int ret = OB_SUCCESS;
common::ObPGKey fake_key(1, 0, 0);
const uint64_t fake_file_id = 1;
const uint64_t tmp_tenant_id = 0 == tenant_id ? 1 : tenant_id;
if (OB_ISNULL(file_store) || NULL == rpc_proxy) {
ret = OB_INVALID_ARGUMENT;
ARCHIVE_LOG(ERROR, "invalid argument", K(ret), K(file_store), K(rpc_proxy));
} else if (OB_FAIL(ObArchiveEntryIterator::init(file_store, fake_key, fake_file_id, 0, 1000, false, tmp_tenant_id))) {
ARCHIVE_LOG(WARN, "init fail", K(ret));
} else {
rpc_proxy_ = rpc_proxy;
}
return ret;
}
}
}

View File

@ -0,0 +1,38 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef OCEANBASE_ARCHIVE_FAKE_ENTRY_ITERATOR_
#define OCEANBASE_ARCHIVE_FAKE_ENTRY_ITERATOR_
#include "archive/ob_archive_entry_iterator.h"
#include "../clog_tool/ob_log_entry_parser.h"
namespace oceanbase
{
namespace archive
{
class ObArchiveFakeEntryIterator : public ObArchiveEntryIterator
{
public:
ObArchiveFakeEntryIterator(): inited_(false), rpc_proxy_(nullptr) {}
virtual ~ObArchiveFakeEntryIterator();
int init(const uint64_t tenant_id,
ObIArchiveLogFileStore *file_store,
obrpc::ObSrvRpcProxy *rpc_proxy);
private:
bool inited_;
obrpc::ObSrvRpcProxy *rpc_proxy_;
};
} // namespace archive
} // namespace oceanbase
#endif /* OCEANBASE_ARCHIVE_FAKE_ENTRY_ITERATOR_ */

View File

@ -0,0 +1,79 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
#include <fcntl.h>
#include <utime.h>
#include <dirent.h>
#include "ob_archive_fake_file_store.h"
namespace oceanbase
{
using namespace clog;
namespace archive
{
int ObArchiveFakeFileStore::init_for_dump(const char *path)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(path)) {
ret = OB_INVALID_ARGUMENT;
ARCHIVE_LOG(ERROR, "invalid argument", KR(ret), K(path));
} else {
MEMCPY(path_, path, MAX_PATH_LENGTH);
inited_ = true;
}
return ret;
}
int ObArchiveFakeFileStore::read_data_direct(const ObArchiveReadParam &param,
ObReadBuf &rbuf,
ObReadRes &res)
{
int ret = OB_SUCCESS;
int fd = 0;
if (OB_UNLIKELY(! inited_)) {
ret = OB_NOT_INIT;
ARCHIVE_LOG(WARN, "ObArchiveLogFileStore not init", K(ret), K(param));
} else if (-1 == (fd = ::open(path_, O_RDONLY))) {
ret = OB_IO_ERROR;
ARCHIVE_LOG(WARN, "open fail", K(ret), K(fd));
} else {
const int64_t buf_size = param.read_len_;
const int64_t offset = param.offset_;
int64_t read_size = 0;
int64_t one_read_size = 0;
while (OB_SUCC(ret) && read_size < buf_size) {
one_read_size = ::pread(fd, rbuf.buf_ + read_size, buf_size - read_size, offset + read_size);
if (0 == one_read_size) {
break;
} else if (one_read_size < 0) {
ret = OB_IO_ERROR;
ARCHIVE_LOG(ERROR, "one read size small than zero", K(ret));
} else {
read_size += one_read_size;
}
}
if (OB_SUCC(ret)) {
::close(fd);
res.buf_ = rbuf.buf_;
res.data_len_ = read_size;
}
}
return ret;
}
}
}

View File

@ -0,0 +1,38 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef OCEANBASE_ARCHIVE_FAKE_FILE_STORE_
#define OCEANBASE_ARCHIVE_FAKE_FILE_STORE_
#include "archive/ob_archive_log_file_store.h"
#include "archive/ob_log_archive_struct.h"
#include "clog/ob_log_reader_interface.h"
namespace oceanbase
{
namespace archive
{
class ObArchiveFakeFileStore : public ObIArchiveLogFileStore
{
static const int64_t MAX_PATH_LENGTH = 1000;
public:
virtual int init_for_dump(const char *path);
virtual int read_data_direct(const ObArchiveReadParam &param,
clog::ObReadBuf &rbuf,
clog::ObReadRes &res);
private:
bool inited_;
char path_[MAX_PATH_LENGTH];
};
}
}
#endif /* OCEANBASE_ARCHIVE_FAKE_FILE_STORE_ */

View File

@ -0,0 +1,99 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#define USING_LOG_PREFIX ARCHIVE
#include "ob_fake_archive_log_file_store.h"
#include "archive/ob_log_archive_struct.h"
#include "clog/ob_log_reader_interface.h"
namespace oceanbase
{
using namespace clog;
using namespace common;
using namespace archive;
namespace tools
{
void ObFakeArchiveLogFileStore::reset()
{
is_inited_ = false;
is_last_file_ = true;
buf_ = NULL;
buf_len_ = -1;
file_id_ = -1;
}
int ObFakeArchiveLogFileStore::init(char *buf,
int64_t buf_len,
const int64_t file_id)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(buf) || OB_UNLIKELY(buf_len <= 0 || file_id <=0)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arguments", K(ret), KP(buf), K(buf_len), K(file_id));
} else {
is_last_file_ = true;//工具默认每个文件都是最后一个文件, 末尾会有写失败的遗留数据
buf_ = buf;
buf_len_ = buf_len;
file_id_ = file_id;
is_inited_ = true;
}
return ret;
}
int ObFakeArchiveLogFileStore::get_file_meta(const uint64_t file_id,
bool &is_last_log_file,
int64_t &file_len)
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("not inited", K(ret));
} else if (OB_UNLIKELY(file_id_ != file_id)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid file_id", K(ret));
} else {
is_last_log_file = is_last_file_;
file_len = buf_len_;
}
return ret;
}
int ObFakeArchiveLogFileStore::read_data_direct(const ObArchiveReadParam &param,
clog::ObReadBuf &rbuf,
clog::ObReadRes &res)
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("not inited", K(ret));
} else if (OB_UNLIKELY(!param.is_valid()
|| !rbuf.is_valid()
|| param.file_id_ != file_id_
|| param.read_len_ > rbuf.buf_len_)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arguments", K(ret), K(param), K(rbuf), K(buf_len_), K(file_id_));
} else if (param.offset_ + param.read_len_ > buf_len_) {
const int64_t data_len = buf_len_ - param.offset_;
MEMCPY(rbuf.buf_, buf_ + param.offset_, data_len);
res.buf_ = rbuf.buf_;
res.data_len_ = data_len;
} else {
MEMCPY(rbuf.buf_, buf_ + param.offset_, param.read_len_);
res.buf_ = rbuf.buf_;
res.data_len_ = param.read_len_;
}
return ret;
}
}//end of namespace
}

View File

@ -0,0 +1,64 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef OB_FAKE_ARCHIVE_LOG_FILE_STORE_H_
#define OB_FAKE_ARCHIVE_LOG_FILE_STORE_H_
#include "archive/ob_archive_log_file_store.h"
namespace oceanbase
{
namespace clog
{
struct ObReadBuf;
struct ObReadRes;
}
namespace archive
{
struct ObArchiveReadParam;
}
namespace tools
{
//used for ob_admin
class ObFakeArchiveLogFileStore : public archive::ObIArchiveLogFileStore
{
public:
ObFakeArchiveLogFileStore() {reset();}
virtual ~ObFakeArchiveLogFileStore(){reset();}
void reset();
public:
virtual int init() {return common::OB_NOT_SUPPORTED;};
int init(char *buf, int64_t buf_len, const int64_t file_id);
public:
virtual int get_file_meta(const uint64_t file_id, bool &is_last_log_file, int64_t &file_len);
/*param:读取日志的文件file_id, offset, len 等信息
* 访timeout
*
* offset+len未超过文件长度filestore本身不需要判断
*
* */
//TODO(yaoying.yyy):param file_id_t
virtual int read_data_direct(const archive::ObArchiveReadParam &param,
clog::ObReadBuf &rbuf,
clog::ObReadRes &res);
private:
bool is_inited_;
bool is_last_file_;
char *buf_;
int64_t buf_len_;
int64_t file_id_;
};
}//end of namespace tools
}//end of namespace oceanbase
#endif /* OB_FAKE_ARCHIVE_LOG_FILE_STORE_H_ */

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,189 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef OB_ADMIN_DUMPBACKUP_EXECUTOR_H_
#define OB_ADMIN_DUMPBACKUP_EXECUTOR_H_
#include "../ob_admin_executor.h"
#include "../dumpsst/ob_admin_dumpsst_print_helper.h"
#include "lib/container/ob_array.h"
#include "share/backup/ob_backup_struct.h"
#include "share/backup/ob_extern_backup_info_mgr.h"
#include "share/backup/ob_log_archive_backup_info_mgr.h"
#include "storage/ob_partition_base_data_physical_restore.h"
#include "share/backup/ob_tenant_name_mgr.h"
#include "storage/blocksstable/ob_block_sstable_struct.h"
#include "storage/blocksstable/ob_macro_block_meta_mgr.h"
#include "storage/blocksstable/ob_store_file.h"
#include "storage/ob_i_table.h"
#include "observer/ob_server_struct.h"
#include "observer/ob_srv_network_frame.h"
#include "observer/omt/ob_worker_processor.h"
#include "observer/omt/ob_multi_tenant.h"
#include "storage/ob_i_table.h"
#include "storage/backup/ob_partition_backup_struct.h"
namespace oceanbase
{
namespace tools
{
typedef common::hash::ObHashMap<storage::ObITable::TableKey,
common::ObArray<storage::ObBackupTableMacroIndex> *> TableMacroIndexMap;
enum ObBackupMacroIndexVersion
{
BACKUP_MACRO_INDEX_VERSION_1, // before 2.2.77
BACKUP_MACRO_INDEX_VERSION_2, // after 3.1
};
class ObAdminDumpBackupDataReaderUtil
{
public:
static int get_data_type(
const char *data_path,
const char *storage_info,
ObBackupFileType &data_type);
static int get_backup_data_common_header(
const char *data_path,
const char *storage_info,
share::ObBackupCommonHeader &common_header);
static int get_backup_data(
const char *data_path,
const char *storage_info,
char *&buf,
int64_t &file_length,
common::ObIAllocator &allocator);
static int get_meta_index(
blocksstable::ObBufferReader &buffer_reader,
share::ObBackupCommonHeader &common_header,
common::ObIArray<ObBackupMetaIndex> &meta_index_array);
static int get_backup_data(
const char *data_path,
const char *storage_info,
char *&buf,
const int64_t offset,
const int64_t data_length,
common::ObIAllocator &allocator);
static int get_meta_header(
const char *buf,
const int64_t read_size,
share::ObBackupMetaHeader &common_header);
static int get_macro_block_index(
blocksstable::ObBufferReader &buffer_reader,
share::ObBackupCommonHeader &common_header,
common::ObIArray<ObBackupMacroIndex> &macro_index_array);
static int get_macro_block_index_v2(
blocksstable::ObBufferReader &buffer_reader,
common::ObIAllocator &allocator,
TableMacroIndexMap &map);
private:
static int add_sstable_index(
const storage::ObITable::TableKey &table_key,
const common::ObIArray<storage::ObBackupTableMacroIndex> &index_list,
common::ObIAllocator &allocator,
TableMacroIndexMap &index_map);
static int get_table_key_ptr(
common::ObIAllocator &allocator,
common::ObArray<storage::ObITable::TableKey *> &table_key_ptrs,
const storage::ObITable::TableKey &table_key,
const storage::ObITable::TableKey *&table_key_ptr);
};
class ObAdminDumpBackupDataExecutor : public ObAdminExecutor
{
public:
ObAdminDumpBackupDataExecutor();
virtual ~ObAdminDumpBackupDataExecutor();
virtual int execute(int argc, char *argv[]);
private:
int parse_cmd(int argc, char *argv[]);
void print_common_header();
void print_common_header(const share::ObBackupCommonHeader &common_header);
void print_meta_header(const share::ObBackupMetaHeader &meta_header);
void print_backup_info();
void print_backup_tenant_info();
void print_clog_backup_info();
void print_backup_set_info();
void print_backup_piece_info();
void print_backup_single_piece_info();
void print_backup_set_file_info();
void print_pg_list();
void print_meta_index();
int print_meta_index_(
const share::ObBackupCommonHeader &common_header,
const common::ObIArray<ObBackupMetaIndex> &meta_index_array);
void print_meta_data();
void print_partition_group_meta(
const char *buf,
const int64_t size);
void print_partition_meta(
const char *buf,
const int64_t size);
void print_sstable_meta(
const char *buf,
const int64_t size);
void print_table_keys(
const char *buf,
const int64_t size);
void print_partition_group_meta_info(
const char *buf,
const int64_t size);
void print_macro_data_index();
int get_macro_data_index_version(int64_t &version);
void print_macro_data_index_v1(
const char *buf,
const int64_t size);
int print_macro_data_index_v1_(
const share::ObBackupCommonHeader &common_header,
const common::ObIArray<ObBackupMacroIndex> &macro_index_array);
void print_macro_data_index_v2(
const char *buf,
const int64_t size);
int print_macro_data_index_v2_(
const TableMacroIndexMap &index_map);
void print_macro_block();
void print_tenant_name_info();
void print_tenant_name_info_meta(const ObTenantNameSimpleMgr::ObTenantNameMeta &meta);
void print_tenant_name_info(const int64_t idx,
const ObTenantNameSimpleMgr::ObTenantNameInfo &info);
void print_usage();
void print_macro_meta();
void print_super_block();
int dump_macro_block(const int64_t macro_id);
void dump_sstable();
void dump_sstable_meta();
int open_store_file();
void print_archive_block_meta();
void print_archive_index_file();
int check_exist(const char *data_path, const char *storage_info);
private:
static const int64_t MAX_BACKUP_ID_STR_LENGTH = 64;
private:
char data_path_[common::OB_MAX_URI_LENGTH];
char input_data_path_[common::OB_MAX_URI_LENGTH];
char storage_info_[share::OB_MAX_BACKUP_STORAGE_INFO_LENGTH];
share::ObBackupCommonHeader common_header_;
common::ObArenaAllocator allocator_;
bool is_quiet_;
int64_t offset_;
int64_t data_length_;
bool check_exist_;
};
} //namespace tools
} //namespace oceanbase
#endif /* OB_ADMIN_DUMPBACKUP_EXECUTOR_H_ */

View File

@ -0,0 +1,210 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#include "ob_admin_cmp_micro_executor.h"
#include "lib/compress/ob_compressor_pool.h"
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
namespace oceanbase
{
using namespace common;
using namespace blocksstable;
using namespace storage;
namespace tools
{
int ObAdminCmpMicroExecutor::execute(int argc, char *argv[])
{
int ret = OB_SUCCESS;
if (OB_FAIL(parse_cmd(argc, argv))) {
STORAGE_LOG(WARN, "failed to parse_cmd", K(argc), K(ret));
} else if (OB_FAIL(open_file())) {
STORAGE_LOG(WARN, "failed to open file 1", K(ret));
} else if (OB_FAIL(read_header())){
STORAGE_LOG(WARN, "failed to read macro hreader", K(ret));
} else if (OB_FAIL(compare_micro())) {
STORAGE_LOG(WARN, "failed to compare micro", K(ret));
}
return ret;
}
int ObAdminCmpMicroExecutor::open_file()
{
int ret = OB_SUCCESS;
for (int64_t i = 0; OB_SUCC(ret) && i < 2; ++i) {
if (0 > (fd_[i] = ::open(data_file_path_[i], O_RDWR))) {
ret = OB_IO_ERROR;
STORAGE_LOG(WARN, "failed to open data_file", K(data_file_path_[i]), K(ret), K(errno), KERRMSG);
} else {
STORAGE_LOG(INFO, "open file success", K(data_file_path_[i]));
}
}
return ret;
}
int ObAdminCmpMicroExecutor::read_header()
{
int ret = OB_SUCCESS;
for (int64_t i = 0; OB_SUCC(ret) && i < 2; ++i) {
int64_t pos = 0;
ObMacroBlockCommonHeader commo_header;
if (NULL == (macro_buf_[i] = (char*)::malloc(MACRO_BLOCK_SIZE))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
STORAGE_LOG(WARN, "failed to allocate macro buffer", K(ret), K(i));
} else if (NULL == (uncomp_micro_[i] = (char*)::malloc(MACRO_BLOCK_SIZE))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
STORAGE_LOG(WARN, "failed to allocate micro buffer", K(ret), K(i));
} else if (0 > pread(fd_[i], macro_buf_[i], MACRO_BLOCK_SIZE, macro_id_[i] * MACRO_BLOCK_SIZE)) {
ret = OB_IO_ERROR;
STORAGE_LOG(WARN, "failed to read macro block", K(ret), K(i), K(macro_id_[i]), K(errno), KERRMSG);
} else if (OB_FAIL(commo_header.deserialize(macro_buf_[i], MACRO_BLOCK_SIZE, pos))) {
STORAGE_LOG(WARN, "failed to deserialize common header", K(ret), K(i), K(commo_header), K(pos));
} else {
header_[i] = reinterpret_cast<ObSSTableMacroBlockHeader*>(macro_buf_[i] + pos);
}
}
if (OB_SUCC(ret) && (header_[0]->micro_block_count_ != header_[1]->micro_block_count_
|| 0 != strcmp(header_[0]->compressor_name_, header_[1]->compressor_name_))) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(ERROR, "micro block count not match", K(*header_[0]), K(*header_[1]));
}
return ret;
}
int ObAdminCmpMicroExecutor::compare_micro()
{
int ret = OB_SUCCESS;
char *micro_data[2];
micro_data[0] = macro_buf_[0] + header_[0]->micro_block_data_offset_;
micro_data[1] = macro_buf_[1] + header_[1]->micro_block_data_offset_;
for (int64_t i = 0; OB_SUCC(ret) && i < header_[0]->micro_block_count_; ++i) {
for (int64_t idx = 0; OB_SUCC(ret) && idx < 2; ++idx) {
if (OB_FAIL(read_micro(micro_data[idx], idx))) {
STORAGE_LOG(WARN, "failed to read micro", K(ret), K(idx) , K(i));
}
}
if (record_header_[0].data_zlength_ != record_header_[1].data_zlength_
|| record_header_[0].data_length_ != record_header_[1].data_length_) {
STORAGE_LOG(WARN, "data length not equal", K(ret), K(record_header_[0]), K(record_header_[1]), K(i));
}
if (0 != MEMCMP(micro_buf_[0], micro_buf_[1], record_header_[0].data_zlength_)) {
STORAGE_LOG(WARN, "micro_buf not equal", K(i));
}
if (record_header_[0].data_length_ != record_header_[0].data_zlength_) {
for (int64_t idx = 0; OB_SUCC(ret) && idx < 2; ++idx) {
if (OB_FAIL(decompress_micro(idx))) {
STORAGE_LOG(WARN, "failed to read micro", K(ret), K(idx), K(i));
}
}
if (0 != MEMCMP(uncomp_micro_[0], uncomp_micro_[1], record_header_[0].data_length_)) {
print_micro_meta(0);
print_micro_meta(1);
for (int64_t c = 0; c < record_header_[0].data_length_; ++c) {
if (uncomp_micro_[0][c] != uncomp_micro_[1][c]) {
STORAGE_LOG(WARN, "found diff point", K(c), KPHEX(uncomp_micro_[0] + c, 1), KPHEX(uncomp_micro_[1] + c, 1));
break;
}
}
STORAGE_LOG(WARN, "uncomp_micro_ not equal", K(i), KPHEX(uncomp_micro_[0], record_header_[0].data_length_), KPHEX(uncomp_micro_[1], record_header_[1].data_length_));
}
}
}
return ret;
}
int ObAdminCmpMicroExecutor::read_micro(char *&micro_data, const int64_t idx)
{
char *cur_micro = micro_data;
record_header_[idx] = *reinterpret_cast<ObRecordHeaderV3*>(micro_data);
micro_data += record_header_[idx].get_serialize_size();
micro_buf_[idx] = micro_data;
micro_data += record_header_[idx].data_zlength_;
return ObRecordHeaderV3::deserialize_and_check_record(cur_micro, record_header_[idx].data_zlength_ + record_header_[idx].get_serialize_size(), MICRO_BLOCK_HEADER_MAGIC);
}
int ObAdminCmpMicroExecutor::decompress_micro(const int64_t idx)
{
int ret = OB_SUCCESS;
ObCompressor *compressor = NULL;
int64_t uncomp_size = 0;
if (OB_FAIL(ObCompressorPool::get_instance().get_compressor(header_[idx]->compressor_name_, compressor))) {
STORAGE_LOG(WARN, "failed to get compressor", K(ret), K(header_[idx]->compressor_name_));
} else if (OB_FAIL(compressor->decompress(micro_buf_[idx], record_header_[idx].data_zlength_, uncomp_micro_[idx], MACRO_BLOCK_SIZE, uncomp_size))) {
STORAGE_LOG(WARN, "failed to decompress", K(ret) , K(idx), K(record_header_[idx]));
} else if (uncomp_size != record_header_[idx].data_length_) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(ERROR, "uncomp size not match", K(uncomp_size), K(record_header_[idx]));
} else {
STORAGE_LOG(INFO, "decompress success", K(uncomp_size), K(record_header_[idx].data_length_), K(idx));
}
return ret;
}
int ObAdminCmpMicroExecutor::parse_cmd(int argc, char *argv[])
{
int ret = OB_SUCCESS;
int opt = 0;
const char* opt_string = "a:b:c:d:";
struct option longopts[] = {
// commands
{ "data_dir_0", 1, NULL, 'a' },
// options
{ "data_dir_1", 1, NULL, 'b' },
{ "macro-id-0", 1, NULL, 'c' },
{ "macro-id-1", 1, NULL, 'd' },
};
int index = -1;
while ((opt = getopt_long(argc, argv, opt_string, longopts, &index)) != -1) {
switch (opt) {
case 'a': {
if (OB_FAIL(databuff_printf(data_file_path_[0], OB_MAX_FILE_NAME_LENGTH, "%s/%s/%s",
optarg, BLOCK_SSTBALE_DIR_NAME, BLOCK_SSTBALE_FILE_NAME))) {
STORAGE_LOG(WARN, "failed to databuff_printf block file path", K(ret), K(optarg));
}
break;
}
case 'b': {
if (OB_FAIL(databuff_printf(data_file_path_[1], OB_MAX_FILE_NAME_LENGTH, "%s/%s/%s",
optarg, BLOCK_SSTBALE_DIR_NAME, BLOCK_SSTBALE_FILE_NAME))) {
STORAGE_LOG(WARN, "failed to databuff_printf block file path", K(ret), K(optarg));
}
break;
}
case 'c': {
macro_id_[0] = strtoll(optarg, NULL, 10);
break;
}
case 'd': {
macro_id_[1] = strtoll(optarg, NULL, 10);
break;
}
default: {
exit(1);
}
}
}
return ret;
}
void ObAdminCmpMicroExecutor::print_micro_meta(const int64_t idx)
{
ObMicroBlockHeaderV2 *micro_header = reinterpret_cast<ObMicroBlockHeaderV2*>(uncomp_micro_[idx]);
STORAGE_LOG(INFO, "micro meta", K(idx), K(*micro_header));
}
}
}

View File

@ -0,0 +1,51 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef OCEANBASE_TOOLS_OB_ADMIN_CMP_MICRO_EXECUTOR_H
#define OCEANBASE_TOOLS_OB_ADMIN_CMP_MICRO_EXECUTOR_H
#include "../ob_admin_executor.h"
#include "storage/blocksstable/ob_block_sstable_struct.h"
namespace oceanbase
{
namespace tools
{
class ObAdminCmpMicroExecutor : public ObAdminExecutor
{
public:
ObAdminCmpMicroExecutor() {}
virtual ~ObAdminCmpMicroExecutor() {}
virtual int execute(int argc, char *argv[]);
private:
int parse_cmd(int argc, char *argv[]);
int open_file();
int read_header();
int compare_micro();
int read_micro(char *&micro_data, const int64_t idx);
int decompress_micro(const int64_t idx);
void print_micro_meta(const int64_t idx);
private:
static const int64_t MACRO_BLOCK_SIZE = 2 << 20;
char data_file_path_[2][common::OB_MAX_FILE_NAME_LENGTH];
int64_t macro_id_[2];
int fd_[2];
blocksstable::ObRecordHeaderV3 record_header_[2];
char *micro_buf_[2];
char *uncomp_micro_[2];
char *macro_buf_[2];
blocksstable::ObSSTableMacroBlockHeader *header_[2];
};
}
}
#endif /* OCEANBASE_TOOLS_OB_ADMIN_CMP_MICRO_EXECUTOR_H */

View File

@ -0,0 +1,789 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#include "ob_admin_dumpsst_executor.h"
#include "ob_admin_dumpsst_utils.h"
#include "storage/blocksstable/ob_store_file.h"
#include "storage/ob_partition_service.h"
#include "storage/ob_table_mgr.h"
#include "storage/ob_table_mgr_meta_block_reader.h"
#include "storage/ob_sstable.h"
#include "storage/blocksstable/ob_macro_meta_block_reader.h"
#include "storage/ob_tenant_config_mgr.h"
#include "storage/ob_tenant_config_meta_block_reader.h"
#include "storage/blocksstable/ob_micro_block_scanner.h"
#include "storage/blocksstable/ob_micro_block_index_reader.h"
#include "storage/blocksstable/ob_sstable_printer.h"
#include "observer/ob_server_struct.h"
#include "storage/ob_file_system_util.h"
#include "storage/ob_pg_storage.h"
using namespace oceanbase::common;
using namespace oceanbase::blocksstable;
using namespace oceanbase::storage;
#define HELP_FMT "\t%-30s%-12s\n"
namespace oceanbase
{
namespace tools
{
ObAdminDumpsstExecutor::ObAdminDumpsstExecutor()
: storage_env_(),
is_quiet_(false),
in_csv_(false),
cmd_(DUMP_MAX),
skip_log_replay_(false),
dump_macro_context_(),
reload_config_(ObServerConfig::get_instance(), GCTX), config_mgr_(ObServerConfig::get_instance(), reload_config_)
{
storage_env_.data_dir_ = data_dir_;
storage_env_.sstable_dir_ = sstable_dir_;
storage_env_.default_block_size_ = 2 * 1024 * 1024;
storage_env_.log_spec_.log_dir_ = slog_dir_;
storage_env_.log_spec_.max_log_size_ = 256 << 20;
storage_env_.clog_dir_ = clog_dir_;
storage_env_.ilog_dir_ = ilog_dir_;
storage_env_.clog_shm_path_ = clog_shm_path_;
storage_env_.ilog_shm_path_ = ilog_shm_path_;
storage_env_.bf_cache_miss_count_threshold_ = 0;
storage_env_.bf_cache_priority_ = 1;
storage_env_.index_cache_priority_ = 10;
storage_env_.user_block_cache_priority_ = 1;
storage_env_.user_row_cache_priority_ = 1;
storage_env_.fuse_row_cache_priority_ = 1;
storage_env_.clog_cache_priority_ = 1;
storage_env_.index_clog_cache_priority_ = 1;
storage_env_.ethernet_speed_ = 10000;
GCONF.datafile_size = 128 * 1024 * 1024;
}
ObAdminDumpsstExecutor::~ObAdminDumpsstExecutor()
{
ObStoreFileSystemWrapper::destroy();
}
int ObAdminDumpsstExecutor::execute(int argc, char *argv[])
{
int ret = OB_SUCCESS;
ObPartitionService partition_service;
UNUSED(argc);
UNUSED(argv);
if (OB_SUCC(parse_cmd(argc, argv))) {
if (is_quiet_) {
OB_LOGGER.set_log_level("ERROR");
} else {
OB_LOGGER.set_log_level("INFO");
}
lib::set_memory_limit(96 * 1024 * 1024 * 1024LL);
lib::set_tenant_memory_limit(500, 96 * 1024 * 1024 * 1024LL);
char ip_port_str[MAX_PATH_SIZE] = {};
if (skip_log_replay_ && DUMP_MACRO_DATA != cmd_) {
ret = OB_INVALID_ARGUMENT;
STORAGE_LOG(ERROR, "Only dump macro block can skip slog replay, ", K(ret));
} else if (OB_FAIL(ObIOManager::get_instance().init(2 * 1024 * 1024 * 1024LL))) {
STORAGE_LOG(ERROR, "Fail to init io manager, ", K(ret));
} else if (OB_FAIL(ObKVGlobalCache::get_instance().init(1024L, 512 * 1024 * 1024, 64 * 1024))) {
STORAGE_LOG(ERROR, "Fail to init kv cache, ", K(ret));
} else if (OB_FAIL(OB_STORE_CACHE.init(
storage_env_.index_cache_priority_,
storage_env_.user_block_cache_priority_,
storage_env_.user_row_cache_priority_,
storage_env_.fuse_row_cache_priority_,
storage_env_.bf_cache_priority_,
storage_env_.bf_cache_miss_count_threshold_))) {
STORAGE_LOG(WARN, "Fail to init OB_STORE_CACHE, ", K(ret), K(storage_env_.data_dir_));
} else if (OB_FAIL(load_config())) {
STORAGE_LOG(WARN, "fail to load config", K(ret));
} else if (OB_FAIL(GCTX.self_addr_.ip_port_to_string(ip_port_str, MAX_PATH_SIZE))) {
STORAGE_LOG(WARN, "get server ip port fail", K(ret), K(GCTX.self_addr_));
} else if (OB_FAIL(OB_FILE_SYSTEM_ROUTER.init(data_dir_,
config_mgr_.get_config().cluster.str(),
config_mgr_.get_config().cluster_id.get_value(),
config_mgr_.get_config().zone.str(),
ip_port_str))) {
STORAGE_LOG(WARN, "fail to init file system", K(ret));
} else if (OB_FAIL(ObStoreFileSystemWrapper::init(storage_env_, partition_service))) {
STORAGE_LOG(ERROR, "Fail to init store file, ", K(ret));
} else {
STORAGE_LOG(INFO, "cmd is", K(cmd_));
switch (cmd_) {
case DUMP_MACRO_DATA:
case DUMP_MACRO_META:
case DUMP_SSTABLE:
case DUMP_SSTABLE_META:
case PRINT_MACRO_BLOCK:
if (OB_FAIL(open_store_file())) {
STORAGE_LOG(ERROR, "failed to open store file", K(ret));
}
break;
default:
break;
}
if (OB_SUCC(ret)) {
switch (cmd_) {
case DUMP_SUPER_BLOCK:
print_super_block();
break;
case DUMP_MACRO_DATA:
dump_macro_block(dump_macro_context_);
break;
case DUMP_MACRO_META:
print_macro_meta();
break;
case DUMP_SSTABLE:
dump_sstable();
break;
case PRINT_MACRO_BLOCK:
print_macro_block();
break;
case DUMP_SSTABLE_META:
dump_sstable_meta();
break;
default:
print_usage();
exit(1);
}
}
}
}
return ret;
}
int ObAdminDumpsstExecutor::load_config()
{
int ret = OB_SUCCESS;
// set dump path
const char *dump_path = "etc/observer.config.bin";
config_mgr_.set_dump_path(dump_path);
if (OB_FAIL(config_mgr_.load_config())) {
STORAGE_LOG(WARN, "fail to load config", K(ret));
} else {
ObServerConfig &config = config_mgr_.get_config();
int32_t local_port = static_cast<int32_t>(config.rpc_port);
int32_t ipv4 = ntohl(obsys::CNetUtil::getLocalAddr(config.devname));
GCTX.self_addr_.set_ipv4_addr(ipv4, local_port);
}
return ret;
}
int ObAdminDumpsstExecutor::parse_cmd(int argc, char *argv[])
{
int ret = OB_SUCCESS;
int opt = 0;
const char* opt_string = "d:hf:a:i:n:qt:s:";
struct option longopts[] = {
// commands
{ "dump", 1, NULL, 'd' },
{ "help", 0, NULL, 'h' },
// options
{ "file", 1, NULL, 'f' },
{ "macro-id", 1, NULL, 'a' },
{ "micro-id", 1, NULL, 'i' },
{ "macro-size", 1, NULL, 'n' },
{ "csv", 0, NULL, 0 },
{ "table_key", 1, NULL, 't'},
{ "quiet", 0, NULL, 'q' },
{ "skip_replay", 0, NULL, 's' },
};
int index = -1;
while ((opt = getopt_long(argc, argv, opt_string, longopts, &index)) != -1) {
switch (opt) {
case 'h': {
print_usage();
exit(1);
}
case 'd': {
if (0 == strcmp(optarg, "sb") || 0 == strcmp(optarg, "super_block")) {
cmd_ = DUMP_SUPER_BLOCK;
} else if (0 == strcmp(optarg, "mb") || 0 == strcmp(optarg, "macro_block")) {
cmd_ = DUMP_MACRO_DATA;
} else if (0 == strcmp(optarg, "mm") || 0 == strcmp(optarg, "macro_meta")) {
cmd_ = DUMP_MACRO_META;
} else if (0 == strcmp(optarg, "sst") || 0 == strcmp(optarg, "sstable")) {
cmd_ = DUMP_SSTABLE;
} else if (0 == strcmp(optarg, "sstm") || 0 == strcmp(optarg, "sstable_meta")) {
cmd_ = DUMP_SSTABLE_META;
} else if (0 == strcmp(optarg, "pm") || 0 == strcmp(optarg, "print_macro")) {
cmd_ = PRINT_MACRO_BLOCK;
} else {
print_usage();
exit(1);
}
break;
}
case 'f': {
int pret = 0;
strcpy(data_dir_, optarg);
pret = snprintf(slog_dir_, OB_MAX_FILE_NAME_LENGTH, "%s/slog", data_dir_);
if (pret < 0 || pret >= OB_MAX_FILE_NAME_LENGTH) {
ret = OB_BUF_NOT_ENOUGH;
STORAGE_LOG(ERROR, "concatenate slog path fail", K(ret));
}
if (OB_SUCC(ret)) {
pret = snprintf(sstable_dir_, OB_MAX_FILE_NAME_LENGTH, "%s/sstable", data_dir_);
if (pret < 0 || pret >= OB_MAX_FILE_NAME_LENGTH) {
ret = OB_BUF_NOT_ENOUGH;
STORAGE_LOG(ERROR, "concatenate slog path fail", K(ret));
}
}
break;
}
case 'a': {
dump_macro_context_.macro_id_ = strtoll(optarg, NULL, 10);
break;
}
case 'i': {
dump_macro_context_.micro_id_ = strtoll(optarg, NULL, 10);
break;
}
case 'n': {
storage_env_.default_block_size_ = strtoll(optarg, NULL, 10);
break;
}
case 'q': {
is_quiet_ = true;
break;
}
case 's': {
skip_log_replay_ = true;
break;
}
case 0: {
std::string name = longopts[index].name;
if ("csv" == name) {
in_csv_ = true;
}
break;
}
case 't': {
if (OB_FAIL(parse_table_key(optarg, table_key_))) {
printf("failed to parse table key\n");
print_usage();
exit(1);
}
break;
}
default: {
print_usage();
exit(1);
}
}
}
return ret;
}
void ObAdminDumpsstExecutor::print_super_block()
{
fprintf(stdout, "SuperBlock: %s\n", to_cstring(OB_FILE_SYSTEM.get_server_super_block()));
}
void ObAdminDumpsstExecutor::print_macro_block()
{
int ret = OB_SUCCESS;
STORAGE_LOG(ERROR, "not supported command", K(ret));
}
int ObAdminDumpsstExecutor::dump_macro_block(const ObDumpMacroBlockContext &macro_block_context)
{
int ret = OB_SUCCESS;
ObStorageFileWithRef file_with_ref;
ObStorageFileHandle file_handle;
ObStorageFile *file = nullptr;
ObMacroBlockHandle macro_handle;
ObSSTableDataBlockReader macro_reader;
ObMacroBlockReadInfo read_info;
ObMacroBlockCtx block_ctx;
block_ctx.sstable_block_id_.macro_block_id_.set_local_block_id(macro_block_context.macro_id_);
block_ctx.sstable_block_id_.macro_block_id_in_files_ = 0;
read_info.macro_block_ctx_ = &block_ctx;
read_info.io_desc_.category_ = SYS_IO;
read_info.offset_ = 0;
read_info.size_ = ObStoreFileSystemWrapper::get_instance().get_macro_block_size();
STORAGE_LOG(INFO, "begin dump macro block", K(macro_block_context));
if (OB_FAIL(file_handle.assign(OB_FILE_SYSTEM.get_server_root_handle()))) {
STORAGE_LOG(WARN, "fail to assign file handle", K(ret));
} else if (!macro_block_context.is_valid()) {
ret = OB_INVALID_ARGUMENT;
STORAGE_LOG(ERROR, "invalid macro block id", K(macro_block_context), K(ret));
} else if (OB_ISNULL(file = file_handle.get_storage_file())) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(ERROR, "storage file is null", K(ret), K(file_handle));
} else if (FALSE_IT(macro_handle.set_file(file))) {
} else if (OB_FAIL(file->read_block(read_info, macro_handle))) {
STORAGE_LOG(ERROR, "Fail to read macro block, ", K(ret), K(read_info));
} else if (OB_FAIL(macro_reader.init(macro_handle.get_buffer(), macro_handle.get_data_size()))) {
STORAGE_LOG(ERROR, "failed to get macro meta", K(block_ctx), K(ret));
STORAGE_LOG(ERROR, "failed to init macro reader", K(ret));
} else if (OB_FAIL(macro_reader.dump())) {
STORAGE_LOG(ERROR, "failed dump macro block", K(ret));
}
macro_handle.reset();
if (nullptr != file) {
OB_FILE_SYSTEM.free_file(file);
}
STORAGE_LOG(INFO, "finish dump macro block", K(macro_block_context));
return ret;
}
int ObAdminDumpsstExecutor::open_store_file()
{
int ret = OB_SUCCESS;
return ret;
}
void ObAdminDumpsstExecutor::print_macro_meta()
{
// int ret = OB_SUCCESS;
const ObMacroBlockMeta *meta = NULL;
ObMacroBlockMetaHandle meta_handle;
MacroBlockId macro_id(dump_macro_context_.macro_id_);
/*if (OB_FAIL(ObMacroBlockMetaMgr::get_instance().get_meta(macro_id, meta_handle))) {
STORAGE_LOG(ERROR, "failed to get meta", K(ret));
} else if (OB_ISNULL(meta = meta_handle.get_meta())) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(ERROR, "meta is null", K(ret));
} else {*/
PrintHelper::print_dump_title("Macro Meta");
PrintHelper::print_dump_line("macro_block_id", dump_macro_context_.macro_id_);
PrintHelper::print_dump_line("attr", meta->attr_);
PrintHelper::print_dump_line("data_version", meta->data_version_);
PrintHelper::print_dump_line("column_number", meta->column_number_);
PrintHelper::print_dump_line("rowkey_column_number", meta->rowkey_column_number_);
PrintHelper::print_dump_line("column_index_scale", meta->column_index_scale_);
PrintHelper::print_dump_line("row_store_type", meta->row_store_type_);
PrintHelper::print_dump_line("row_count", meta->row_count_);
PrintHelper::print_dump_line("occupy_size", meta->occupy_size_);
PrintHelper::print_dump_line("data_checksum", meta->data_checksum_);
PrintHelper::print_dump_line("micro_block_count", meta->micro_block_count_);
PrintHelper::print_dump_line("micro_block_data_offset", meta->micro_block_data_offset_);
PrintHelper::print_dump_line("micro_block_index_offset", meta->micro_block_index_offset_);
PrintHelper::print_dump_line("micro_block_endkey_offset", meta->micro_block_endkey_offset_);
PrintHelper::print_dump_line("compressor", meta->compressor_);
PrintHelper::print_dump_line("table_id", meta->table_id_);
PrintHelper::print_dump_line("data_seq", meta->data_seq_);
PrintHelper::print_dump_line("schema_version", meta->schema_version_);
PrintHelper::print_dump_line("snapshot_version", meta->snapshot_version_);
PrintHelper::print_dump_line("schema_rowkey_col_cnt", meta->schema_rowkey_col_cnt_);
PrintHelper::print_dump_line("row_count_delta", meta->row_count_delta_);
PrintHelper::print_dump_line("macro_block_deletion_flag", meta->macro_block_deletion_flag_);
PrintHelper::print_dump_list_start("column_id_array");
for (int64_t i = 0; i < meta->column_number_; ++i) {
PrintHelper::print_dump_list_value(meta->column_id_array_[i], i == meta->rowkey_column_number_ - 1);
}
PrintHelper::print_dump_list_end();
PrintHelper::print_dump_list_start("column_type_array");
for (int64_t i = 0; i < meta->column_number_; ++i) {
PrintHelper::print_dump_list_value(to_cstring(meta->column_type_array_[i]), i == meta->rowkey_column_number_ - 1);
}
PrintHelper::print_dump_list_end();
PrintHelper::print_dump_list_start("column_checksum");
for (int64_t i = 0; i < meta->column_number_; ++i) {
PrintHelper::print_dump_list_value(meta->column_checksum_[i], i== meta->rowkey_column_number_ - 1);
}
PrintHelper::print_dump_list_end();
PrintHelper::print_dump_list_start("end_key");
for (int64_t i = 0; i < meta->rowkey_column_number_; ++i) {
PrintHelper::print_cell(meta->endkey_[i], in_csv_);
}
PrintHelper::print_dump_list_end();
PrintHelper::print_dump_list_start("column_order");
for (int64_t i = 0; i < meta->column_number_; ++i) {
PrintHelper::print_dump_list_value(meta->column_order_array_[i], i == meta->column_number_ - 1);
}
PrintHelper::print_dump_list_end();
PrintHelper::print_end_line();
//}
}
void ObAdminDumpsstExecutor::dump_sstable()
{
int ret = OB_SUCCESS;
ObSSTable *sstable = NULL;
if (OB_FAIL(replay_slog_to_get_sstable(sstable))) {
STORAGE_LOG(ERROR, "failed to acquire table", K_(table_key), K(ret));
} else if (OB_ISNULL(sstable)) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(ERROR, "sstable is null", K(ret));
} else {
const ObIArray<blocksstable::MacroBlockId> &macro_array = sstable->get_macro_block_ids();
ObDumpMacroBlockContext context;
for (int64_t i = 0; OB_SUCC(ret) && i < macro_array.count(); ++i) {
context.macro_id_ = macro_array.at(i).block_index();
if (OB_FAIL(dump_macro_block(context))) {
STORAGE_LOG(ERROR, "failed to dump macro block", K(context), K(ret));
}
}
if (OB_SUCC(ret) && sstable->has_lob_macro_blocks()) {
for (int64_t i = 0; OB_SUCC(ret) && i < sstable->get_lob_macro_block_ids().count(); ++i) {
context.macro_id_ = sstable->get_lob_macro_block_ids().at(i).block_index();
if (OB_FAIL(dump_macro_block(context))) {
STORAGE_LOG(ERROR, "Failed to dump lob macro block", K(context), K(ret));
}
}
}
}
if (OB_FAIL(ret)) {
printf("failed to dump sstable, ret = %d\n", ret);
}
}
void ObAdminDumpsstExecutor::dump_sstable_meta()
{
int ret = OB_SUCCESS;
ObTableHandle handle;
ObSSTable *sstable = NULL;
if (OB_FAIL(ObPartitionService::get_instance().acquire_sstable(table_key_, handle))) {
STORAGE_LOG(ERROR, "fail to acquire table", K(ret), K(table_key_));
} else if (OB_FAIL(handle.get_sstable(sstable))) {
STORAGE_LOG(ERROR, "fail to get sstable", K(ret));
} else if (OB_ISNULL(sstable)) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "error unexpected, sstable must not be NULL", K(ret));
} else {
char buf[1024];
const ObSSTableMeta &meta = sstable->get_meta();
snprintf(buf, 1024, "table_id=%ld, partition_id=%ld", table_key_.table_id_, table_key_.pkey_.get_partition_id());
PrintHelper::print_dump_title(buf);
PrintHelper::print_dump_line("index_id", meta.index_id_);
PrintHelper::print_dump_line("row_count", meta.row_count_);
PrintHelper::print_dump_line("occupy_size", meta.occupy_size_);
PrintHelper::print_dump_line("data_checksum", meta.data_checksum_);
PrintHelper::print_dump_line("row_checksum", meta.row_checksum_);
PrintHelper::print_dump_line("macro_block_count", meta.macro_block_count_);
PrintHelper::print_dump_line("use_old_macro_block_count", meta.use_old_macro_block_count_);
PrintHelper::print_dump_line("column_count", meta.column_cnt_);
PrintHelper::print_dump_line("lob_macro_block_count", meta.lob_macro_block_count_);
PrintHelper::print_dump_line("lob_use_old_macro_block_count", meta.lob_use_old_macro_block_count_);
PrintHelper::print_dump_line("schema_version", meta.schema_version_);
PrintHelper::print_dump_line("progressive_merge_start_version", meta.progressive_merge_start_version_);
PrintHelper::print_dump_line("progressive_merge_end_version", meta.progressive_merge_end_version_);
PrintHelper::print_dump_line("checksum_method", meta.checksum_method_);
meta.column_metas_.to_string(buf, 1024);
PrintHelper::print_dump_line("column_metas", buf);
meta.new_column_metas_.to_string(buf, 1024);
PrintHelper::print_dump_line("new_column_metas", buf);
}
}
int ObAdminDumpsstExecutor::replay_slog_to_get_sstable(ObSSTable *&sstable)
{
int ret = OB_SUCCESS;
ObBaseFileMgr file_mgr;
ObPartitionMetaRedoModule pg_mgr;
ObPartitionComponentFactory cp_fty;
if (OB_FAIL(file_mgr.init())) {
STORAGE_LOG(WARN, "fail to init file mgr", K(ret));
} else if (OB_FAIL(pg_mgr.init(&cp_fty,
&share::schema::ObMultiVersionSchemaService::get_instance(),
&file_mgr))) {
STORAGE_LOG(WARN, "fail to init pg mgr", K(ret));
} else {
ObAdminSlogReplayer replayer(file_mgr, pg_mgr, slog_dir_);
replayer.init();
if (OB_FAIL(replayer.replay_slog())) {
STORAGE_LOG(WARN, "fail to replay slog", K(ret));
} else if (OB_FAIL(replayer.get_sstable(table_key_, sstable))) {
STORAGE_LOG(WARN, "fail to get sstable", K(ret));
}
}
return ret;
}
ObAdminSlogReplayer::ObAdminSlogReplayer(
ObBaseFileMgr &file_mgr,
ObPartitionMetaRedoModule &pg_mgr,
char *slog_dir)
:
svr_addr_(GCTX.self_addr_),
svr_root_(),
file_mgr_(file_mgr),
pg_mgr_(pg_mgr),
super_block_(),
pg_meta_reader_(),
tenant_file_reader_(),
slog_dir_(slog_dir)
{
}
ObAdminSlogReplayer::~ObAdminSlogReplayer()
{
reset();
}
int ObAdminSlogReplayer::init()
{
int ret = OB_SUCCESS;
common::ObLogCursor checkpoint;
blocksstable::ObServerSuperBlock super_block;
ObStorageFileWithRef file_with_ref;
ObStorageFileHandle file_handle;
ObStorageFile *file = nullptr;
ObTenantMutilAllocatorMgr::get_instance().init();
if (OB_FAIL(file_handle.assign(OB_FILE_SYSTEM.get_server_root_handle()))) {
STORAGE_LOG(WARN, "fail to assign file handle", K(ret));
} else if (OB_FAIL(file_handle.get_storage_file()->read_super_block(super_block_))) {
STORAGE_LOG(WARN, "failed to read super block", K(ret));
}
return ret;
}
int ObAdminSlogReplayer::replay_slog()
{
int ret = OB_SUCCESS;
common::ObLogCursor checkpoint;
if (OB_UNLIKELY(!svr_addr_.is_valid())) {
ret = OB_INVALID_ARGUMENT;
STORAGE_LOG(WARN, "invalid source server", K(ret), K(svr_addr_));
} else if (OB_FAIL(OB_FILE_SYSTEM.alloc_file(svr_root_.file_))) {
STORAGE_LOG(WARN, "fail to alloc storage file", K(ret));
} else if (OB_FAIL(svr_root_.file_->init(svr_addr_,
ObStorageFile::FileType::SERVER_ROOT))) {
STORAGE_LOG(WARN, "fail to init original server root file", K(ret), K(svr_addr_));
} else if (OB_FAIL(svr_root_.file_->open(ObFileSystemUtil::WRITE_FLAGS))) {
STORAGE_LOG(WARN, "fail to open pg file", K(ret));
} else if (OB_FAIL(svr_root_.file_->read_super_block(super_block_))) {
STORAGE_LOG(WARN, "failed to read super block", K(ret));
} else if (OB_FAIL(read_checkpoint_and_replay_log(checkpoint))) {
STORAGE_LOG(WARN, "fail to read checkpoint and replay log", K(ret));
} else {
STORAGE_LOG(INFO, "success to replay log", K(ret));
}
return ret;
}
void ObAdminSlogReplayer::reset()
{
tenant_file_reader_.reset();
super_block_.reset();
if (OB_NOT_NULL(svr_root_.file_)) {
int tmp_ret = OB_SUCCESS;
if (OB_SUCCESS != (tmp_ret = OB_FILE_SYSTEM.free_file(svr_root_.file_))) {
STORAGE_LOG(WARN, "fail to free svr root", K(tmp_ret));
}
}
}
int ObAdminSlogReplayer::get_sstable(
storage::ObITable::TableKey table_key,
storage::ObSSTable *&sstable)
{
int ret = OB_SUCCESS;
sstable = NULL;
storage::ObIPartitionGroupGuard guard;
storage::ObPGPartitionGuard pg_guard;
ObTablesHandle tables_handle;
common::ObArray<ObSSTable *> sstables;
if (OB_FAIL(pg_mgr_.get_partition(table_key.pkey_, guard)) || NULL == guard.get_partition_group()) {
STORAGE_LOG(WARN, "invalid partition", K(ret), K(table_key));
} else if (OB_FAIL(guard.get_partition_group()->get_pg_storage().get_all_sstables(tables_handle))) {
STORAGE_LOG(WARN, "failed to get tables", K(ret), K(table_key));
} else if (OB_FAIL(tables_handle.get_all_sstables(sstables))) {
STORAGE_LOG(WARN, "failed to get all sstables", K(ret), K(table_key));
} else {
for (int i = 0; i < sstables.count(); ++i) {
if (sstables.at(i)->get_key() == table_key) {
sstable = sstables.at(i);
break;
}
}
}
if (OB_SUCC(ret) && OB_NOT_NULL(sstable)) {
STORAGE_LOG(INFO, "success to get sstable", K(ret), K(table_key), KPC(sstable));
} else {
STORAGE_LOG(WARN, "sstable not found", K(ret), K(table_key), K(sstables));
}
return ret;
}
int ObAdminSlogReplayer::replay_server_slog(
const char *slog_dir,
const common::ObLogCursor &replay_start_cursor,
const ObStorageLogCommittedTransGetter &committed_trans_getter)
{
int ret = OB_SUCCESS;
ObStorageLogReplayer log_replayer;
ServerMetaSLogFilter filter_before_parse;
ObStorageFileHandle svr_root_handle;
if (OB_ISNULL(slog_dir) || OB_UNLIKELY(!replay_start_cursor.is_valid())) {
ret = OB_INVALID_ARGUMENT;
STORAGE_LOG(WARN, "invalid argument", K(ret), KP(slog_dir), K(replay_start_cursor));
} else if (FALSE_IT(svr_root_handle.set_storage_file_with_ref(svr_root_))) {
} else if (OB_FAIL(tenant_file_reader_.read_checkpoint(super_block_.content_.super_block_meta_,
file_mgr_, svr_root_handle))) {
STORAGE_LOG(WARN, "fail to read tenant file checkpoint", K(ret));
} else if (OB_FAIL(log_replayer.init(slog_dir, &filter_before_parse))) {
STORAGE_LOG(WARN, "fail to init log replayer", K(ret));
} else if (OB_FAIL(log_replayer.register_redo_module(OB_REDO_LOG_TENANT_FILE, &file_mgr_))) {
STORAGE_LOG(WARN, "fail to register redo module", K(ret));
} else if (OB_FAIL(log_replayer.replay(replay_start_cursor, committed_trans_getter))) {
STORAGE_LOG(WARN, "fail to replay log replayer", K(ret));
} else if (OB_FAIL(file_mgr_.replay_open_files(svr_addr_))) {
STORAGE_LOG(WARN, "fail to replay open files", K(ret));
} else {
STORAGE_LOG(INFO, "finish replay server slog");
}
return ret;
}
int ObAdminSlogReplayer::replay_pg_slog(const char *slog_dir,
const common::ObLogCursor &replay_start_cursor,
const blocksstable::ObStorageLogCommittedTransGetter &committed_trans_getter,
common::ObLogCursor &checkpoint)
{
int ret = OB_SUCCESS;
ObStorageLogReplayer log_replayer;
PGMetaSLogFilter filter_before_parse(file_mgr_);
if (OB_ISNULL(slog_dir) || OB_UNLIKELY(!replay_start_cursor.is_valid())) {
ret = OB_INVALID_ARGUMENT;
STORAGE_LOG(WARN, "invalid argument", K(ret), KP(slog_dir), K(replay_start_cursor));
} else if (OB_FAIL(pg_meta_reader_.read_checkpoint(file_mgr_, pg_mgr_))) {
STORAGE_LOG(WARN, "fail to read checkpoint", K(ret));
} else if (OB_FAIL(log_replayer.init(slog_dir, &filter_before_parse))) {
STORAGE_LOG(WARN, "fail to init log replayer", K(ret));
} else if (OB_FAIL(log_replayer.register_redo_module(OB_REDO_LOG_PARTITION, &pg_mgr_))) {
STORAGE_LOG(WARN, "fail to register redo module", K(ret));
} else if (OB_FAIL(log_replayer.replay(replay_start_cursor, committed_trans_getter))) {
STORAGE_LOG(WARN, "fail to replay log replayer", K(ret));
} else if (OB_FAIL(log_replayer.get_active_cursor(checkpoint))){
STORAGE_LOG(WARN, "fail to get active cursor", K(ret));
} else {
STORAGE_LOG(INFO, "finish replay pg slog", K(checkpoint), K(pg_mgr_.get_pg_mgr().get_total_partition_count()));
}
return ret;
}
int ObAdminSlogReplayer::read_checkpoint_and_replay_log(common::ObLogCursor &checkpoint)
{
int ret = OB_SUCCESS;
char dir_name[MAX_PATH_SIZE];
common::ObLogCursor replay_start_cursor;
ObStorageLogCommittedTransGetter committed_trans_getter;
ObServerWorkingDir dir(svr_addr_, ObServerWorkingDir::DirStatus::RECOVERING);
if (OB_FAIL(dir.to_path_string(dir_name, sizeof(dir_name)))) {
STORAGE_LOG(WARN, "get server ip port fail", K(ret), K(dir));
} else if (super_block_.content_.replay_start_point_.is_valid()) {
replay_start_cursor = super_block_.content_.replay_start_point_;
} else {
replay_start_cursor.file_id_ = 1;
replay_start_cursor.log_id_ = 0;
replay_start_cursor.offset_ = 0;
}
if (OB_SUCC(ret)) {
STORAGE_LOG(INFO, "read checkpoint begin", K(slog_dir_), K(svr_addr_), K(replay_start_cursor));
if (OB_FAIL(committed_trans_getter.init(slog_dir_, replay_start_cursor))) {
STORAGE_LOG(WARN, "fail to init committed trans getter", K(ret));
} else if (OB_FAIL(replay_server_slog(slog_dir_, replay_start_cursor, committed_trans_getter))) {
STORAGE_LOG(WARN, "fail to replay tenant file slog", K(ret));
} else if (OB_FAIL(replay_pg_slog(slog_dir_, replay_start_cursor, committed_trans_getter, checkpoint))) {
STORAGE_LOG(WARN, "fail to replay pg slog", K(ret));
} else if (OB_FAIL(file_mgr_.replay_over())) {
STORAGE_LOG(WARN, "fail to replay over file mgr", K(ret));
}
STORAGE_LOG(INFO, "read checkpoint end", K(ret), K_(slog_dir), K(svr_addr_),
K(replay_start_cursor), K(checkpoint));
}
return ret;
}
int ObAdminSlogReplayer::ServerMetaSLogFilter::filter(
const ObISLogFilter::Param &param,
bool &is_filtered) const
{
int ret = OB_SUCCESS;
enum ObRedoLogMainType main_type = OB_REDO_LOG_MAX;
int32_t sub_type = 0;
is_filtered = false;
ObIRedoModule::parse_subcmd(param.subcmd_, main_type, sub_type);
is_filtered = (OB_REDO_LOG_TENANT_FILE != main_type);
return ret;
}
int ObAdminSlogReplayer::PGMetaSLogFilter::filter(
const ObISLogFilter::Param &param,
bool &is_filtered) const
{
int ret = OB_SUCCESS;
enum ObRedoLogMainType main_type = OB_REDO_LOG_MAX;
int32_t sub_type = 0;
is_filtered = false;
ObIRedoModule::parse_subcmd(param.subcmd_, main_type, sub_type);
is_filtered = (OB_REDO_LOG_PARTITION != main_type);
if (!is_filtered) {
if (OB_VIRTUAL_DATA_FILE_ID == param.attr_.data_file_id_) {
is_filtered = false;
} else {
ObTenantFileKey file_key(param.attr_.tenant_id_, param.attr_.data_file_id_);
ObTenantFileInfo file_info;
if (OB_FAIL(file_mgr_.get_tenant_file_info(file_key, file_info))) {
if (OB_ENTRY_NOT_EXIST == ret) {
is_filtered = true;
ret = OB_SUCCESS;
}
} else {
is_filtered = !file_info.is_normal_status();
}
}
}
return ret;
}
void ObAdminDumpsstExecutor::print_usage()
{
printf("\n");
printf("Usage: dumpsst command [command args] [options]\n");
printf("commands:\n");
printf(HELP_FMT, "-d,--dump", "dump, args: [super_block|print_macro|macro_block|macro_meta|sstable|sstable_meta]");
printf(HELP_FMT, "-h,--help", "display this message.");
printf("options:\n");
printf(HELP_FMT, "-f,--data-file-name", "data file path");
printf(HELP_FMT, "-a,--macro-id", "macro block index");
printf(HELP_FMT, "-i,--micro-id", "micro block id, -1 means all micro blocks");
printf(HELP_FMT, "-n,--macro-size", "macro block size, in bytes");
printf(HELP_FMT, "-q,--quiet", "log level: ERROR");
printf(HELP_FMT, "-t,--table-key", "table key: table_type,table_id:partition_id,index_id,base_version:multi_version_start:snapshot_version,start_log_ts:end_log_ts:max_log_ts,major_version");
printf(HELP_FMT, "-s,--skip_replay", "skip slog replay, only work for macro_block mode");
printf("samples:\n");
printf(" dump all metas and rows in macro: \n");
printf("\tob_admin dumpsst -d pm -f block_file_path -a macro_id -i micro_id\n");
printf(" dump all rows in macro: \n");
printf("\tob_admin dumpsst -d macro_block -f block_file_path -a macro_id -i micro_id: dump rows in macro\n");
}
} //namespace tools
} //namespace oceanbase

View File

@ -0,0 +1,164 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef OB_ADMIN_DUMPSST_EXECUTOR_H_
#define OB_ADMIN_DUMPSST_EXECUTOR_H_
#include "../ob_admin_executor.h"
#include "lib/container/ob_array.h"
#include "share/config/ob_config_manager.h"
#include "storage/blocksstable/ob_block_sstable_struct.h"
#include "storage/blocksstable/ob_macro_block_meta_mgr.h"
#include "storage/blocksstable/ob_store_file.h"
#include "storage/ob_i_table.h"
#include "observer/ob_server_struct.h"
#include "observer/ob_srv_network_frame.h"
#include "observer/omt/ob_worker_processor.h"
#include "observer/omt/ob_multi_tenant.h"
#include "observer/ob_server_reload_config.h"
#include "storage/blocksstable/ob_store_file_system.h"
#include "storage/ob_tenant_file_super_block_checkpoint_reader.h"
#include "storage/ob_server_pg_meta_checkpoint_reader.h"
namespace oceanbase
{
namespace storage
{
class ObBaseFileMgr;
class ObPartitionMetaRedoModule;
}
namespace tools
{
enum ObAdminDumpsstCmd
{
DUMP_MACRO_META,
DUMP_SUPER_BLOCK,
DUMP_MACRO_DATA,
PRINT_MACRO_BLOCK,
DUMP_SSTABLE,
DUMP_SSTABLE_META,
DUMP_MAX,
};
struct ObDumpMacroBlockContext final
{
public:
ObDumpMacroBlockContext()
: macro_id_(-1), micro_id_(-1), tenant_id_(common::OB_INVALID_ID), file_id_(-1)
{}
~ObDumpMacroBlockContext() = default;
bool is_valid() const { return macro_id_ >= 0; }
TO_STRING_KV(K_(macro_id), K_(micro_id), K_(tenant_id), K_(file_id));
int64_t macro_id_;
int64_t micro_id_;
uint64_t tenant_id_;
int64_t file_id_;
};
class ObAdminDumpsstExecutor : public ObAdminExecutor
{
public:
ObAdminDumpsstExecutor();
virtual ~ObAdminDumpsstExecutor();
virtual int execute(int argc, char *argv[]);
private:
int parse_cmd(int argc, char *argv[]);
void print_macro_block();
void print_usage();
void print_macro_meta();
void print_super_block();
int dump_macro_block(const ObDumpMacroBlockContext &context);
void dump_sstable();
void dump_sstable_meta();
int open_store_file();
int load_config();
int replay_slog_to_get_sstable(storage::ObSSTable *&sstable);
blocksstable::ObStorageEnv storage_env_;
char data_dir_[common::OB_MAX_FILE_NAME_LENGTH];
char slog_dir_[common::OB_MAX_FILE_NAME_LENGTH];
char clog_dir_[common::OB_MAX_FILE_NAME_LENGTH];
char ilog_dir_[common::OB_MAX_FILE_NAME_LENGTH];
char clog_shm_path_[common::OB_MAX_FILE_NAME_LENGTH];
char ilog_shm_path_[common::OB_MAX_FILE_NAME_LENGTH];
char sstable_dir_[common::OB_MAX_FILE_NAME_LENGTH];
bool is_quiet_;
bool in_csv_;
ObAdminDumpsstCmd cmd_;
storage::ObITable::TableKey table_key_;
bool skip_log_replay_;
ObDumpMacroBlockContext dump_macro_context_;
observer::ObServerReloadConfig reload_config_;
common::ObConfigManager config_mgr_;
};
class ObAdminSlogReplayer final
{
public:
explicit ObAdminSlogReplayer(storage::ObBaseFileMgr &file_mgr,
storage::ObPartitionMetaRedoModule &pg_mgr,
char *slog_dir);
virtual ~ObAdminSlogReplayer();
int replay_slog();
int init();
void reset();
int get_sstable(storage::ObITable::TableKey table_key, storage::ObSSTable *&sstable);
private:
int read_checkpoint_and_replay_log(common::ObLogCursor &checkpoint);
int replay_server_slog(
const char *slog_dir,
const common::ObLogCursor &replay_start_cursor,
const blocksstable::ObStorageLogCommittedTransGetter &committed_trans_getter);
int replay_pg_slog(const char *slog_dir,
const common::ObLogCursor &replay_start_cursor,
const blocksstable::ObStorageLogCommittedTransGetter &committed_trans_getter,
common::ObLogCursor &checkpoint);
private:
class ServerMetaSLogFilter : public blocksstable::ObISLogFilter
{
public:
ServerMetaSLogFilter() = default;
virtual ~ServerMetaSLogFilter() = default;
virtual int filter(const ObISLogFilter::Param &param, bool &is_filtered) const override;
};
class PGMetaSLogFilter : public blocksstable::ObISLogFilter
{
public:
explicit PGMetaSLogFilter(storage::ObBaseFileMgr &file_mgr) : file_mgr_(file_mgr) {};
virtual ~PGMetaSLogFilter() = default;
virtual int filter(const ObISLogFilter::Param &param, bool &is_filtered) const override;
private:
storage::ObBaseFileMgr &file_mgr_;
};
const common::ObAddr svr_addr_;
blocksstable::ObStorageFileWithRef svr_root_;
storage::ObBaseFileMgr &file_mgr_;
storage::ObPartitionMetaRedoModule &pg_mgr_;
blocksstable::ObServerSuperBlock super_block_;
storage::ObServerPGMetaCheckpointReader pg_meta_reader_;
storage::ObTenantFileSuperBlockCheckpointReader tenant_file_reader_;
char *slog_dir_;
DISALLOW_COPY_AND_ASSIGN(ObAdminSlogReplayer);
};
} //namespace tools
} //namespace oceanbase
#endif /* OB_ADMIN_DUMPSST_EXECUTOR_H_ */

View File

@ -0,0 +1,338 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#include "ob_admin_dumpsst_print_helper.h"
namespace oceanbase
{
using namespace storage;
using namespace common;
using namespace blocksstable;
namespace tools
{
const std::string PrintHelper::term(",");
const std::string PrintHelper::trans_term("\\,");
#define FPRINTF(args...) fprintf(stderr, ##args)
#define P_BAR() FPRINTF("|")
#define P_DASH() FPRINTF("------------------------------")
#define P_END_DASH() FPRINTF("--------------------------------------------------------------------------------")
#define P_NAME(key) FPRINTF("%s", (key))
#define P_NAME_FMT(key) FPRINTF("%30s", (key))
#define P_VALUE_STR_B(key) FPRINTF("[%s]", (key))
#define P_VALUE_INT_B(key) FPRINTF("[%d]", (key))
#define P_VALUE_BINT_B(key) FPRINTF("[%ld]", (key))
#define P_VALUE_STR(key) FPRINTF("%s", (key))
#define P_VALUE_INT(key) FPRINTF("%d", (key))
#define P_VALUE_BINT(key) FPRINTF("%ld", (key))
#define P_NAME_BRACE(key) FPRINTF("{%s}", (key))
#define P_COLOR(color) FPRINTF(color)
#define P_LB() FPRINTF("[")
#define P_RB() FPRINTF("]")
#define P_LBRACE() FPRINTF("{")
#define P_RBRACE() FPRINTF("}")
#define P_COLON() FPRINTF(":")
#define P_COMMA() FPRINTF(",")
#define P_TAB() FPRINTF("\t")
#define P_LINE_NAME(key) FPRINTF("%30s", (key))
#define P_LINE_VALUE(key, type) P_LINE_VALUE_##type((key))
#define P_LINE_VALUE_INT(key) FPRINTF("%-30d", (key))
#define P_LINE_VALUE_BINT(key) FPRINTF("%-30ld", (key))
#define P_LINE_VALUE_STR(key) FPRINTF("%-30s", (key))
#define P_END() FPRINTF("\n")
#define P_TAB_LEVEL(level) \
do { \
for (int64_t i = 1; i < (level); ++i) { \
P_TAB(); \
if (i != level - 1) P_BAR(); \
} \
} while (0)
#define P_DUMP_LINE(type) \
do { \
P_TAB_LEVEL(level); \
P_BAR(); \
P_LINE_NAME(name); \
P_BAR(); \
P_LINE_VALUE(value, type); \
P_END(); \
} while (0)
#define P_DUMP_LINE_COLOR(type) \
do { \
P_COLOR(LIGHT_GREEN); \
P_TAB_LEVEL(level); \
P_BAR(); \
P_COLOR(CYAN); \
P_LINE_NAME(name); \
P_BAR(); \
P_COLOR(NONE_COLOR); \
P_LINE_VALUE(value, type); \
P_END(); \
} while (0)
void PrintHelper::print_dump_title(const char *title, const int64_t level)
{
if (isatty(fileno(stderr))) {
P_COLOR(LIGHT_GREEN);
}
P_TAB_LEVEL(level);
P_DASH();
P_NAME_BRACE(title);
P_DASH();
P_END();
}
void PrintHelper::print_end_line(const int64_t level)
{
if (isatty(fileno(stderr))) {
P_COLOR(LIGHT_GREEN);
}
P_TAB_LEVEL(level);
P_END_DASH();
P_END();
if (isatty(fileno(stderr))) {
P_COLOR(NONE_COLOR);
}
}
void PrintHelper::print_dump_title(const char *title, const int64_t &value, const int64_t level)
{
if (isatty(fileno(stderr))) {
P_COLOR(LIGHT_GREEN);
}
P_TAB_LEVEL(level);
P_DASH();
P_LBRACE();
P_NAME(title);
P_VALUE_BINT_B(value);
P_RBRACE();
P_DASH();
P_END();
if (isatty(fileno(stderr))) {
P_COLOR(NONE_COLOR);
}
}
void PrintHelper::print_dump_line(const char *name, const uint32_t &value, const int64_t level)
{
if (isatty(fileno(stderr))) {
P_DUMP_LINE_COLOR(INT);
} else {
P_DUMP_LINE(INT);
}
}
void PrintHelper::print_dump_line(const char *name, const uint64_t &value, const int64_t level)
{
if (isatty(fileno(stderr))) {
P_DUMP_LINE_COLOR(BINT);
} else {
P_DUMP_LINE(BINT);
}
}
void PrintHelper::print_dump_line(const char *name, const int32_t &value, const int64_t level)
{
if (isatty(fileno(stderr))) {
P_DUMP_LINE_COLOR(INT);
} else {
P_DUMP_LINE(INT);
}
}
void PrintHelper::print_dump_line(const char *name, const int64_t &value, const int64_t level)
{
if (isatty(fileno(stderr))) {
P_DUMP_LINE_COLOR(BINT);
} else {
P_DUMP_LINE(BINT);
}
}
void PrintHelper::print_dump_line(const char *name, const char *value, const int64_t level)
{
if (isatty(fileno(stderr))) {
P_DUMP_LINE_COLOR(STR);
} else {
P_DUMP_LINE(STR);
}
}
void PrintHelper::print_dump_list_start(const char *name, const int64_t level)
{
if (isatty(fileno(stderr))) {
P_COLOR(LIGHT_GREEN);
P_TAB_LEVEL(level);
P_BAR();
P_COLOR(CYAN);
P_NAME_FMT(name);
P_BAR();
P_COLOR(NONE_COLOR);
P_LB();
} else {
P_TAB_LEVEL(level);
P_BAR();
P_NAME_FMT(name);
P_BAR();
P_LB();
}
}
void PrintHelper::print_dump_list_value(const int64_t &value, const bool is_last)
{
P_VALUE_BINT(value);
if (!is_last) {
P_COMMA();
}
}
void PrintHelper::print_dump_list_value(const int32_t &value, const bool is_last)
{
P_VALUE_INT(value);
if (!is_last) {
P_COMMA();
}
}
void PrintHelper::print_dump_list_value(const char *value, const bool is_last)
{
P_VALUE_STR(value);
if (!is_last) {
P_COMMA();
}
}
void PrintHelper::print_dump_list_end()
{
if (isatty(fileno(stderr))) {
P_RB();
P_END();
P_COLOR(NONE_COLOR);
} else {
P_RB();
P_END();
}
}
void PrintHelper::print_dump_cols_info_start(const char *n1, const char *n2, const char *n3, const char *n4)
{
if (isatty(fileno(stderr))) {
P_COLOR(LIGHT_GREEN);
}
FPRINTF("--------{%-15s %15s %15s %15s}----------\n", n1, n2, n3, n4);
}
void PrintHelper::print_dump_cols_info_line(const int32_t &v1, const char *v2, const int64_t &v3, const int64_t &v4)
{
if (isatty(fileno(stderr))) {
P_COLOR(LIGHT_GREEN);
P_BAR();
P_COLOR(NONE_COLOR);
FPRINTF("\t[%-15d %15s %15ld %15ld]\n", v1, v2, v3, v4);
} else {
P_BAR();
FPRINTF("\t[%-15d %15s %15ld %15ld]\n", v1, v2, v3, v4);
}
}
void PrintHelper::print_row_title(const bool use_csv, const ObStoreRow *row, const int64_t row_index)
{
// print title
if (!use_csv) {
if (isatty(fileno(stderr))) {
P_COLOR(LIGHT_GREEN);
P_BAR();
P_COLOR(CYAN);
P_NAME("ROW");
P_VALUE_BINT_B(row_index);
P_COLON();
if (OB_NOT_NULL(row->trans_id_ptr_)) {
P_NAME("trans_id=");
P_VALUE_BINT_B(row->trans_id_ptr_->hash());
P_COMMA();
} else {
P_NAME("trans_id=");
P_VALUE_INT_B(0);
P_COMMA();
}
P_NAME("flag=");
P_VALUE_BINT_B(row->flag_);
P_COMMA();
P_NAME("first_dml=");
P_VALUE_INT_B(row->get_first_dml());
P_COMMA();
P_NAME("dml=");
P_VALUE_INT_B(row->get_dml());
P_COMMA();
P_NAME("multi_version_row_flag=");
P_VALUE_INT_B(row->row_type_flag_.flag_);
P_BAR();
P_COLOR(NONE_COLOR);
} else {
P_BAR();
P_NAME("ROW");
P_VALUE_BINT_B(row_index);
P_COLON();
if (OB_NOT_NULL(row->trans_id_ptr_)) {
P_NAME("trans_id=");
P_VALUE_BINT_B(row->trans_id_ptr_->hash());
P_COMMA();
} else {
P_NAME("trans_id=");
P_VALUE_INT_B(0);
P_COMMA();
}
P_NAME("flag=");
P_VALUE_BINT_B(row->flag_);
P_COMMA();
P_NAME("first_dml=");
P_VALUE_INT_B(row->get_first_dml());
P_COMMA();
P_NAME("dml=");
P_VALUE_INT_B(row->get_dml());
P_COMMA();
P_NAME("multi_version_row_flag=");
P_VALUE_INT_B(row->row_type_flag_.flag_);
P_BAR();
}
}
}
void PrintHelper::print_cell(const ObObj &cell, const bool use_csv)
{
if (!use_csv) {
P_VALUE_STR_B(to_cstring(cell));
} else {
int64_t pos = 0;
char buf[MAX_BUF_SIZE];
cell.print_sql_literal(buf, MAX_BUF_SIZE, pos);
P_NAME(buf);
P_COMMA();
}
}
void PrintHelper::replace_all(std::string &str, const std::string &from, const std::string &to)
{
if(from.empty())
return;
size_t start_pos = 0;
while((start_pos = str.find(from, start_pos)) != std::string::npos) {
str.replace(start_pos, from.length(), to);
start_pos += to.length();
}
}
}
}

View File

@ -0,0 +1,71 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#include <string.h>
#include <stdint.h>
#include "storage/blocksstable/ob_block_sstable_struct.h"
#define NONE_COLOR "\033[m"
#define RED "\033[0;32;31m"
#define LIGHT_RED "\033[1;31m"
#define GREEN "\033[0;32;32m"
#define LIGHT_GREEN "\033[1;32m"
#define BLUE "\033[0;32;34m"
#define LIGHT_BLUE "\033[1;34m"
#define DARY_GRAY "\033[1;30m"
#define CYAN "\033[0;36m"
#define LIGHT_CYAN "\033[1;36m"
#define PURPLE "\033[0;35m"
#define LIGHT_PURPLE "\033[1;35m"
#define BROWN "\033[0;33m"
#define YELLOW "\033[1;33m"
#define LIGHT_GRAY "\033[0;37m"
//#define WHITE "\033[1;37m"
namespace oceanbase
{
namespace tools
{
static const int64_t MAX_BUF_SIZE = 65 * 1024L;
class PrintHelper
{
public:
static const std::string term;
static const std::string trans_term;
static void print_dump_title(const char *name, const int64_t &value, const int64_t level = 1);
static void print_dump_title(const char *name, const int64_t level = 1);
static void print_dump_line(const char *name, const int32_t &value, const int64_t level = 1);
static void print_dump_line(const char *name, const int64_t &value, const int64_t level = 1);
static void print_dump_line(const char *name, const uint32_t &value, const int64_t level = 1);
static void print_dump_line(const char *name, const uint64_t &value, const int64_t level = 1);
static void print_dump_line(const char *name, const char *value, const int64_t level = 1);
static void print_row_title(const bool use_csv, const storage::ObStoreRow *row, const int64_t row_index);
static void print_cell(const common::ObObj &cell, const bool use_csv);
static void print_end_line(const int64_t level = 1);
static void print_dump_list_start(const char *name, const int64_t level = 1);
static void print_dump_list_end();
static void print_dump_list_value(const int64_t &value, const bool is_last);
static void print_dump_list_value(const int32_t &value, const bool is_last);
static void print_dump_list_value(const char *value, const bool is_last);
static void print_dump_cols_info_start(const char *n1, const char *n2,
const char *n3, const char *n4);
static void print_dump_cols_info_line(const int32_t &v1, const char *v2,
const int64_t &v3, const int64_t & v4);
private:
static void replace_all(std::string &str, const std::string &from, const std::string &to);
};
}
}

View File

@ -0,0 +1,929 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#define USING_LOG_PREFIX STORAGE
#include "ob_admin_dumpsst_utils.h"
#include "share/ob_define.h"
#include "lib/utility/utility.h"
#include "lib/oblog/ob_log_module.h"
#include "lib/compress/ob_compressor.h"
#include "lib/compress/ob_compressor_pool.h"
#include "storage/blocksstable/ob_row_reader.h"
#include "storage/ob_i_store.h"
namespace oceanbase
{
using namespace common;
using namespace share;
using namespace serialization;
using namespace blocksstable;
using namespace storage;
namespace tools
{
int parse_string(const char* src, const char del, const char* dst[], int64_t& size)
{
int ret = OB_SUCCESS;
int64_t obj_index = 0;
char* str = (char*) ob_malloc(OB_MAX_FILE_NAME_LENGTH, ObModIds::TEST);
strncpy(str, src, strlen(src) + 1);
char* st_ptr = str;
char* et_ptr = NULL; //use for strchar,the fist position of del in src
char* last_ptr = str + strlen(str) - 1; //end of the str
//skip white space
while (0 != *st_ptr && ' ' == *st_ptr)
++st_ptr;
//parse rowkey_str from st_str division by ';' ,except the last item
while (NULL != st_ptr && NULL != (et_ptr = strchr(st_ptr, del))) {
*et_ptr = 0;
if (size <= obj_index) {
ret = OB_SIZE_OVERFLOW;
break;
} else {
dst[obj_index++] = st_ptr;
st_ptr = et_ptr + 1;
while (0 != *st_ptr && ' ' == *st_ptr)
++st_ptr;
if (size - 1 == obj_index) {
dst[obj_index++] = st_ptr;
break;
}
}
}
if (OB_SUCCESS == ret && 0 != *st_ptr && size > obj_index) {
//skip white space of end
while (last_ptr > st_ptr && ' ' == *last_ptr) {
*last_ptr = 0;
--last_ptr;
}
dst[obj_index++] = st_ptr;
}
if (OB_SUCC(ret)) {
size = obj_index;
}
return ret;
}
int parse_table_key(const char *str, ObITable::TableKey &table_key)
{
int ret = OB_SUCCESS;
int pret = 0;
int64_t tmp = 0;
const int64_t TABLE_KEY_ATTR_CNT = 6;
int64_t size = TABLE_KEY_ATTR_CNT;
const char *dst[TABLE_KEY_ATTR_CNT];
if (OB_FAIL(parse_string(str, ',', dst, size))) {
LOG_WARN("failed to parse string", K(ret));
}
if (OB_SUCC(ret)) {
// parse table type
pret = sscanf(dst[0], "%ld", &tmp);
if (pret != 1) {
ret = OB_INVALID_ARGUMENT;
printf("table type is not valid");
} else {
table_key.table_type_ = (ObITable::TableType)tmp;
}
}
if (OB_SUCC(ret)) {
if (OB_FAIL(parse_partition_key(dst[1], table_key.pkey_))) {
LOG_WARN("failed to parse partition key", K(ret));
}
}
if (OB_SUCC(ret)) {
pret = sscanf(dst[2], "%lu", &table_key.table_id_);
if (pret != 1) {
ret = OB_INVALID_ARGUMENT;
printf("index id is not valid");
}
}
if (OB_SUCC(ret)) {
if (OB_FAIL(parse_version_range(dst[3], table_key.trans_version_range_))) {
LOG_WARN("failed to parse version range");
}
}
if (OB_SUCC(ret)) {
if (OB_FAIL(parse_log_ts_range(dst[4], table_key.log_ts_range_))) {
LOG_WARN("failed to parse log ts range");
}
}
if (OB_SUCC(ret)) {
pret = sscanf(dst[5], "%ld", &tmp);
if (pret != 1) {
ret = OB_INVALID_ARGUMENT;
printf("major version is not valid");
} else {
table_key.version_.version_ = tmp;
}
}
if (OB_SUCC(ret)) {
LOG_INFO("success to parse table key", K(table_key));
}
return ret;
}
int parse_partition_key(const char *str, ObPartitionKey &pkey)
{
int ret = OB_SUCCESS;
int pret = 0;
int64_t size = 2;
const char *dst[2];
uint64_t table_id = 0;
int64_t part_id = 0;
if (OB_FAIL(parse_string(str, ':', dst, size))) {
STORAGE_LOG(ERROR, "failed to parse string", K(ret));
} else if (2 != size) {
ret = OB_INVALID_ARGUMENT;
STORAGE_LOG(ERROR, "table id and partition id should be provided", K(ret), K(str));
}
if (OB_SUCC(ret)) {
pret = sscanf(dst[0], "%lu", &table_id);
if (pret != 1) {
ret = OB_INVALID_ARGUMENT;
printf("table id is not valid");
}
}
if (OB_SUCC(ret)) {
pret = sscanf(dst[1], "%ld", &part_id);
if (pret != 1) {
ret = OB_INVALID_ARGUMENT;
printf("partition id is not valid");
}
}
if (OB_SUCC(ret)) {
if (OB_FAIL(pkey.init(table_id, part_id, 0))) {
STORAGE_LOG(ERROR, "faile to init pkey", K(ret), K(table_id), K(part_id));
}
}
return ret;
}
int parse_version_range(const char *str, ObVersionRange &version_range)
{
int ret = OB_SUCCESS;
int pret = 0;
int64_t size = 3;
const char *dst[3];
if (OB_FAIL(parse_string(str, ':', dst, size))) {
STORAGE_LOG(ERROR, "failed to parse string", K(ret));
} else if (3 != size) {
ret = OB_INVALID_ARGUMENT;
STORAGE_LOG(ERROR, "version range has three member variables", K(ret), K(str));
}
if (OB_SUCC(ret)) {
pret = sscanf(dst[0], "%ld", &version_range.base_version_);
if (pret != 1) {
ret = OB_INVALID_ARGUMENT;
printf("base version is not valid");
}
}
if (OB_SUCC(ret)) {
pret = sscanf(dst[1], "%ld", &version_range.multi_version_start_);
if (pret != 1) {
ret = OB_INVALID_ARGUMENT;
printf("multi_version_start is not valid");
}
}
if (OB_SUCC(ret)) {
pret = sscanf(dst[2], "%ld", &version_range.snapshot_version_);
if (pret != 1) {
ret = OB_INVALID_ARGUMENT;
printf("snapshot_version is not valid");
}
}
return ret;
}
int parse_log_ts_range(const char *str, ObLogTsRange &log_ts_range)
{
int ret = OB_SUCCESS;
int pret = 0;
int64_t size = 3;
const char *dst[3];
if (OB_FAIL(parse_string(str, ':', dst, size))) {
STORAGE_LOG(ERROR, "failed to parse string", K(ret));
} else if (3 != size) {
ret = OB_INVALID_ARGUMENT;
STORAGE_LOG(ERROR, "version range has three member variables", K(ret), K(str));
}
if (OB_SUCC(ret)) {
pret = sscanf(dst[0], "%ld", &log_ts_range.start_log_ts_);
if (pret != 1) {
ret = OB_INVALID_ARGUMENT;
printf("start_log_ts is not valid");
}
}
if (OB_SUCC(ret)) {
pret = sscanf(dst[1], "%ld", &log_ts_range.end_log_ts_);
if (pret != 1) {
ret = OB_INVALID_ARGUMENT;
printf("end_log_ts is not valid");
}
}
if (OB_SUCC(ret)) {
pret = sscanf(dst[2], "%ld", &log_ts_range.max_log_ts_);
if (pret != 1) {
ret = OB_INVALID_ARGUMENT;
printf("max_log_ts is not valid");
}
}
return ret;
}
static const char* obj_type_name[] = { "ObNullType", "ObTinyIntType", "ObSmallIntType",
"ObMediumIntType", "ObInt32Type", "ObIntType",
"ObUTinyIntType", "ObUSmallIntType", "ObUMediumIntType",
"ObUInt32Type", "ObUInt64Type", "ObFloatType",
"ObDoubleType", "ObUFloatType", "ObUDoubleType",
"ObNumberType", "ObUNumberType", "ObDateTimeType",
"ObTimestampType", "ObDateType", "ObTimeType", "ObYearType",
"ObVarcharType", "ObCharType", "ObHexStringType",
"ObExtendType", "ObUnknowType", "ObTinyTextType",
"ObTextType", "ObMediumTextType", "ObLongTextType", "ObBitType"};
int MacroBlock::setup(const char *data, const int64_t size, const int64_t macro_id)
{
int ret = OB_SUCCESS;
int64_t pos = 0;
if (OB_ISNULL(data)
|| OB_UNLIKELY(size <= 0)
|| OB_UNLIKELY(macro_id < 0)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret), KP(data), K(size), K(macro_id));
} else {
data_ = data;
size_ = size;
macro_id_ = macro_id;
if (OB_FAIL(common_header_.deserialize(data_, size_, pos))) {
LOG_ERROR("deserialize common header fail", K(ret), KP(data), K(size), K(pos));
} else if (OB_FAIL(common_header_.check_integrity())) {
LOG_ERROR("invalid common header", K(ret), K_(common_header));
}
}
return ret;
}
int MicroBlock::setup(const char *data, const int64_t size, const int64_t micro_id,
const ObColumnMap *column_map, const int64_t row_store_type,
const uint16_t *column_id_list, const int64_t column_cnt)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(data)
|| OB_UNLIKELY(size <= 0)
|| OB_UNLIKELY(micro_id < 0)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret), KP(data), K(size), K(micro_id));
} else {
data_ = data;
size_ = size;
micro_id_ = micro_id;
row_store_type_ = row_store_type;
column_map_ = column_map;
column_cnt_ = column_cnt;
column_id_list_ = column_id_list;
}
return ret;
}
int MacroBlockReader::init(const char *data, const int64_t size, const int64_t macro_id,
const ObMacroBlockMeta *macro_meta)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(is_inited_)) {
ret = OB_INIT_TWICE;
LOG_WARN("init twice", K(ret));
} else if (OB_ISNULL(data) || size <= 0 || macro_id <= 0 || OB_ISNULL(macro_meta)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret), KP(data), K(size), K(macro_id), KP(macro_meta));
} else if (OB_FAIL(macro_block_.setup(data, size, macro_id))) {
STORAGE_LOG(ERROR, "failed to setup macro block", K(ret), KP(data), K(size), K(macro_id));
} else {
macro_meta_ = macro_meta;
int64_t pos = 0;
// parse headers
pos += macro_block_.common_header_.get_serialize_size();
sstable_header_ = reinterpret_cast<const ObSSTableMacroBlockHeader*>(macro_block_.data_ + pos);
pos += sizeof(ObSSTableMacroBlockHeader);
const int64_t column_cnt = sstable_header_->column_count_;
column_id_list_ = reinterpret_cast<const uint16_t*>(macro_block_.data_ + pos);
pos += sizeof(uint16_t) * column_cnt;
column_type_list_ = reinterpret_cast<const common::ObObjMeta*>(macro_block_.data_ + pos);
pos += sizeof(common::ObObjMeta) * column_cnt;
column_checksum_ = reinterpret_cast<const int64_t*>(macro_block_.data_ + pos);
pos += sizeof(int64_t) * column_cnt;
if (OB_FAIL(parse_micro_block_index())) {
LOG_WARN("failed to parse micro index", K(ret));
} else if (OB_FAIL(build_column_map())) {
LOG_WARN("failed to build column map", K(ret));
} else {
is_inited_ = true;
}
}
return ret;
}
void MacroBlockReader::reset()
{
macro_block_.reset();
macro_meta_ = NULL;
sstable_header_ = NULL;
column_id_list_ = NULL;
column_type_list_ = NULL;
column_checksum_ = NULL;
curr_micro_block_id_ = -1;
curr_micro_block_.reset();
column_map_.reuse();
micro_index_pos_.reset();
micro_index_keys_.reset();
micro_mark_deletions_.reset();
cur_mib_rh_ = NULL;
is_inited_ = false;
}
int64_t MacroBlockReader::count() const
{
return sstable_header_->micro_block_count_;
}
int MacroBlockReader::set_index(const int64_t index)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("not inited", K(ret));
} else if (OB_UNLIKELY(index < 0)
|| OB_UNLIKELY(index >= count())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(index));
} else {
curr_micro_block_id_ = index;
if (OB_FAIL(set_current_micro_block())) {
if (OB_ITER_END != ret) {
LOG_WARN("failed to set current micro block", K(ret));
}
}
}
return ret;
}
int MacroBlockReader::get_value(const MicroBlock *&micro_block)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("not inited", K(ret));
} else {
micro_block = NULL;
if (curr_micro_block_id_ >= micro_index_pos_.count()) {
ret = OB_ITER_END;
} else {
micro_block = &curr_micro_block_;
}
}
return ret;
}
int64_t MacroBlockReader::get_index() const
{
return curr_micro_block_id_;
}
int MacroBlockReader::set_current_micro_block()
{
int ret = OB_SUCCESS;
const char *out = NULL;
int64_t outsize = 0;
const char* compressor_name = sstable_header_->compressor_name_;
if (curr_micro_block_id_ >= micro_index_pos_.count()) {
ret = OB_ITER_END;
} else {
ObPosition datapos = micro_index_pos_.at(curr_micro_block_id_);
if (OB_FAIL(get_micro_block_payload_buffer(compressor_name,
macro_block_.data_ + sstable_header_->micro_block_data_offset_ + datapos.offset_,
datapos.length_, out, outsize, cur_mib_rh_))) {
LOG_WARN("failed to read micro block", K(ret), K(curr_micro_block_id_), K(macro_block_.macro_id_));
} else {
//cur_mib_payload_ = out;
//cur_mib_payload_size_ = outsize;
curr_micro_block_.reset();
if (OB_FAIL(curr_micro_block_.setup(out,
outsize,
curr_micro_block_id_,
&column_map_,
sstable_header_->row_store_type_,
column_id_list_,
sstable_header_->column_count_))) {
LOG_WARN("failed to setup curr micro block", K(ret), K(curr_micro_block_id_));
}
}
}
return ret;
}
int MacroBlockReader::get_micro_block_payload_buffer(const char* compressor_name, const char* buf,
const int64_t size, const char*& out, int64_t& outsize, const blocksstable::ObRecordHeaderV3 *&rh)
{
int ret = OB_SUCCESS;
int64_t pos = 0;
int64_t max_size = size;
char *uncompressed_buf = NULL;
rh = reinterpret_cast<const ObRecordHeaderV3*>(buf);
pos += sizeof(ObRecordHeaderV3);
if (rh->data_length_ != rh->data_zlength_) {
ObCompressor* compressor = NULL;
if (OB_FAIL(ObCompressorPool::get_instance().get_compressor(compressor_name, compressor))) {
LOG_WARN("get compressor failed", K(ret));
} else if (OB_FAIL(compressor->get_max_overflow_size(rh->data_length_, max_size))) {
LOG_WARN("get max overflow size failed", K(ret));
} else if ((NULL == uncompressed_buf) && (NULL == (uncompressed_buf =
reinterpret_cast<char*>(ob_malloc(max_size + rh->data_length_, ObModIds::OB_SSTABLE_AIO))))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to alloc memory for uncompress buf", K(ret), K(size));
} else if (OB_FAIL(compressor->decompress(buf + pos, rh->data_zlength_,
uncompressed_buf, max_size + rh->data_length_, outsize))) {
LOG_WARN("failed to decompress data", K(ret), K(rh->data_zlength_), K(rh->data_length_), K(max_size));
} else if (outsize != rh->data_length_) {
ret = OB_ERROR;
LOG_WARN("decompress error", K(ret), "expect_size", rh->data_length_, "real_size", outsize);
} else {
out = uncompressed_buf;
}
} else {
out = reinterpret_cast<const char*>(buf) + pos;
outsize = rh->data_length_;
}
return ret;
}
int MacroBlockReader::parse_micro_block_index()
{
int ret = OB_SUCCESS;
const char *index_ptr = NULL;
int32_t index_size = 0;
const char* endkey_ptr = NULL;
blocksstable::ObPosition data_pos;
blocksstable::ObPosition key_pos;
micro_index_pos_.reuse();
micro_index_keys_.reuse();
micro_mark_deletions_.reuse();
index_ptr = macro_block_.data_ + sstable_header_->micro_block_index_offset_;
index_size = sstable_header_->micro_block_index_size_;
UNUSED(index_size);
endkey_ptr = macro_block_.data_ + sstable_header_->micro_block_endkey_offset_;
UNUSED(endkey_ptr);
for (int32_t i = 0; OB_SUCC(ret) && i < sstable_header_->micro_block_count_; ++i) {
parse_one_micro_block_index(index_ptr, i, data_pos, key_pos);
if (OB_FAIL(micro_index_pos_.push_back(data_pos))) {
LOG_WARN("failed to push back micro index pos", K(ret), K(data_pos));
} else if (OB_FAIL(micro_index_keys_.push_back(key_pos))) {
LOG_WARN("failed to push back micro index key", K(ret), K(key_pos));
}
}
if (OB_SUCC(ret)) {
const char *deletion_ptr = macro_block_.data_ + macro_meta_->micro_block_mark_deletion_offset_;
for (int64_t i = 0; OB_SUCC(ret) && i < sstable_header_->micro_block_count_; ++i) {
uint8_t mark_deletion = deletion_ptr[i];
LOG_INFO("micro block mark deletion info", K(mark_deletion), K(i));
if (OB_FAIL(micro_mark_deletions_.push_back(mark_deletion > 0 ? true : false))) {
LOG_WARN("fail to push back micro mark deletion", K(ret));
}
}
}
return ret;
}
void MacroBlockReader::parse_one_micro_block_index(const char* index_ptr,
const int64_t idx, ObPosition& datapos, ObPosition& keypos)
{
int32_t off1[2];
int32_t off2[2];
off1[0] = *reinterpret_cast<const int32_t*>(index_ptr + sizeof(int32_t) * 2 * idx);
off1[1] = *reinterpret_cast<const int32_t*>(index_ptr + sizeof(int32_t) * 2 * idx + sizeof(int32_t));
off2[0] = *reinterpret_cast<const int32_t*>(index_ptr + sizeof(int32_t) * 2 * (idx + 1));
off2[1] = *reinterpret_cast<const int32_t*>(index_ptr + sizeof(int32_t) * 2 * (idx + 1) + sizeof(int32_t));
datapos.offset_ = off1[0];
datapos.length_ = off2[0] - off1[0];
keypos.offset_ = off1[1];
keypos.length_ = off2[1] - off1[1];
}
int MacroBlockReader::build_column_map()
{
int ret = OB_SUCCESS;
hash::ObArrayIndexHashSet<const uint16_t*, uint16_t> column_hash;
column_hash.reset();
column_hash.init(column_id_list_);
for (int64_t i = 0; OB_SUCC(ret) && i < sstable_header_->column_count_; ++i) {
if (OB_FAIL(column_hash.set_index(i))) {
LOG_WARN("column hash set index failed", K(ret), K(i));
}
}
uint64_t idx = 0;
if (OB_SUCC(ret)) {
column_map_.reuse();
ObArray<share::schema::ObColDesc> out_cols;
ObArray<int32_t> projector;
for (int64_t i = 0; OB_SUCC(ret) && i < sstable_header_->column_count_; ++i) {
ret = column_hash.get_index(static_cast<uint16_t>(column_id_list_[i]), idx);
if (OB_ENTRY_NOT_EXIST == ret) {
idx = -1;
ret = OB_SUCCESS;
}
share::schema::ObColDesc col;
col.col_id_ = column_id_list_[i];
col.col_type_ = column_type_list_[i];
if (OB_FAIL(ret)) {
} else if (OB_FAIL(out_cols.push_back(col))) {
LOG_WARN("fail to add column to out column array", K(ret), K(col));
} else if (OB_FAIL(projector.push_back(static_cast<int32_t>(idx)))) {
LOG_WARN("fail to add index to projector", K(ret), K(idx));
}
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(column_map_.init(allocator_,
macro_meta_->schema_version_,
sstable_header_->rowkey_column_count_,
sstable_header_->column_count_,
out_cols,
nullptr,
&projector))) {
LOG_WARN("failed to init column map", K(ret), K(macro_meta_->schema_version_),
K(sstable_header_->rowkey_column_count_), K(sstable_header_->column_count_), K(out_cols), K(projector));
}
}
return ret;
}
int MacroBlockReader::dump_header()
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("not inited", K(ret));
} else {
dump_common_header(&macro_block_.common_header_);
dump_sstable_header(sstable_header_);
// dump column id, type, checksum list;
if (sstable_header_->column_count_ > 0) {
PrintHelper::print_dump_cols_info_start("column_id", "column_type", "column_checksum", "collation type");
for (int64_t i = 0; i < sstable_header_->column_count_; ++i) {
PrintHelper::print_dump_cols_info_line(*(column_id_list_ + i),
obj_type_name[(column_type_list_ + i)->get_type()], *(column_checksum_ + i),
(column_type_list_ + i)->get_collation_type());
}
}
PrintHelper::print_end_line();
// dump micro_block_index;
ObObj objs[sstable_header_->rowkey_column_count_];
const char *endkey_ptr = macro_block_.data_ + sstable_header_->micro_block_endkey_offset_;
int64_t endkey_ptr_pos = 0;
for (int32_t i = 0; i < sstable_header_->micro_block_count_; ++i) {
parse_micro_block_key(endkey_ptr,
endkey_ptr_pos,
endkey_ptr_pos + micro_index_keys_.at(i).length_,
column_type_list_,
objs,
sstable_header_->rowkey_column_count_);
ObRowkey endkey(objs, sstable_header_->rowkey_column_count_);
dump_micro_index(endkey, micro_index_pos_.at(i), micro_index_keys_.at(i), micro_mark_deletions_.at(i), i);
}
}
return ret;
}
int MacroBlockReader::parse_micro_block_key(const char* buf, int64_t& pos, int64_t row_end_pos,
const ObObjMeta* type_list, ObObj* objs, int32_t rowkey_count)
{
int ret = OB_SUCCESS;
ObFlatRowReader reader;
ObNewRow row;
row.cells_ = objs;
row.count_ = rowkey_count;
if (OB_SUCC(ret)) {
if (OB_SUCCESS != (ret = reader.read_compact_rowkey(type_list, rowkey_count, buf /*+ pos*/, row_end_pos, pos, row))) {
fprintf(stderr, "fail to read row."
"ret = %d, pos = %ld, end_pos = %ld\n",
ret, pos, row_end_pos);
} else {
pos = row_end_pos;
}
}
return ret;
}
void MacroBlockReader::dump_micro_index(const ObRowkey& endkey, const blocksstable::ObPosition& datapos,
const blocksstable::ObPosition& keypos, const bool mark_deletion, int32_t num)
{
PrintHelper::print_dump_title("Micro Index", num * 1L, 1);
PrintHelper::print_dump_line("endkey", to_cstring(endkey));
PrintHelper::print_dump_line("datapos offset", datapos.offset_);
PrintHelper::print_dump_line("datapos length", datapos.length_);
PrintHelper::print_dump_line("keypos offset", keypos.offset_);
PrintHelper::print_dump_line("keypos length", keypos.length_);
PrintHelper::print_dump_line("mark deletion", mark_deletion);
PrintHelper::print_end_line();
}
int MacroBlockReader::dump(const int64_t micro_id)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("not inited", K(ret));
} else if (OB_FAIL(dump_header())) {
LOG_WARN("failed to dump header", K(ret));
} else if (micro_id < ALL_MINOR_INDEX) {
LOG_WARN("invalid micro id", K(ret), K(micro_id));
} else {
const MicroBlock *micro_block = NULL;
MicroBlockReader micro_reader;
for (int64_t i = 0; OB_SUCC(ret) && i < count(); ++i) {
if (micro_id == ALL_MINOR_INDEX || micro_id == i) {
micro_block = NULL;
micro_reader.reset();
if (OB_FAIL(set_index(i))) {
LOG_WARN("failed to set index", K(ret), K(i));
} else if (OB_FAIL(get_value(micro_block))) {
LOG_WARN("failed to get micro block", K(ret));
} else if (OB_ISNULL(micro_block)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("micro block is NULL", K(ret));
} else if (OB_FAIL(micro_reader.init(*micro_block))) {
LOG_WARN("failed to setup micro reader", K(ret));
} else if (OB_FAIL(micro_reader.dump(-1))) {
LOG_WARN("dump micro block failed", K(ret));
}
}
}
}
return ret;
}
int MicroBlockReader::init(const MicroBlock &micro_block)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(is_inited_)) {
ret = OB_INIT_TWICE;
LOG_WARN("init twice", K(ret));
} else if (OB_UNLIKELY(!micro_block.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret), K(micro_block));
} else {
micro_block_ = micro_block;
if (FLAT_ROW_STORE == micro_block_.row_store_type_) {
micro_reader_ = &flat_micro_reader_;
} else {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected row store type", K(ret), "type", micro_block_.row_store_type_);
}
if (OB_SUCC(ret)) {
if (OB_FAIL(micro_reader_->init(micro_block))) {
LOG_WARN("failed to init micro reader", K(ret), K(micro_block));
} else {
is_inited_ = true;
}
}
}
return ret;
}
int64_t MicroBlockReader::count() const
{
return micro_reader_ == NULL ? 0 : micro_reader_->count();
}
int MicroBlockReader::set_index(const int64_t index)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("not inited", K(ret));
} else if (OB_FAIL(micro_reader_->set_index(index))) {
LOG_WARN("failed to set index", K(ret), K(index));
}
return ret;
}
int64_t MicroBlockReader::get_index() const
{
return micro_reader_ == NULL ? -1 : micro_reader_->get_index();
}
int MicroBlockReader::get_value(const storage::ObStoreRow *&value)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("not inited", K(ret));
} else if (OB_FAIL(micro_reader_->get_value(value))) {
LOG_WARN("failed to get value", K(ret));
}
return ret;
}
void MicroBlockReader::reset()
{
micro_block_.reset();
micro_reader_ = NULL;
flat_micro_reader_.reset();
is_inited_ = false;
}
int MicroBlockReader::dump(const int64_t)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("not inited", K(ret));
} else if (OB_FAIL(micro_reader_->dump(-1))) {
LOG_WARN("failed to dump", K(ret));
}
return ret;
}
int FlatMicroBlockReader::init(const MicroBlock &micro_block)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(is_inited_)) {
ret = OB_INIT_TWICE;
LOG_WARN("init twice", K(ret));
} else if (OB_UNLIKELY(!micro_block.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret), K(micro_block));
} else {
micro_block_ = micro_block;
micro_block_header_ = reinterpret_cast<const ObMicroBlockHeader *>(micro_block_.data_);
index_buffer_ = reinterpret_cast<const int32_t*>(micro_block_.data_ + micro_block_header_->row_index_offset_);
curr_row_.row_val_.cells_ = columns_;
curr_row_.row_val_.count_ = micro_block_header_->column_count_;
is_inited_ = true;
}
return ret;
}
int64_t FlatMicroBlockReader::count() const
{
return micro_block_header_->row_count_;
}
int FlatMicroBlockReader::set_index(const int64_t index)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("not inited", K(ret));
} else if (OB_UNLIKELY(index < 0)
|| OB_UNLIKELY(index >= count())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(index));
} else if (index >= count()) {
ret = OB_ITER_END;
} else {
curr_row_index_ = index;
}
return ret;
}
int64_t FlatMicroBlockReader::get_index() const
{
return curr_row_index_;
}
int FlatMicroBlockReader::get_value(const storage::ObStoreRow *&row)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("not inited", K(ret));
} else {
int64_t pos = micro_block_header_->header_size_ + *(index_buffer_ + curr_row_index_);
int64_t end_pos = micro_block_header_->header_size_ + *(index_buffer_ + curr_row_index_ + 1);
if (curr_row_index_ >= micro_block_header_->row_count_ || end_pos > micro_block_.size_) {
ret = OB_ITER_END;
} else {
reader_.reuse_allocator();
if (OB_FAIL(
reader_.read_row(
micro_block_.data_,
end_pos,
pos,
*micro_block_.column_map_,
allocator_,
curr_row_))) {
LOG_WARN("failed to read meta row", K(ret), K(pos), K(end_pos));
} else {
row = &curr_row_;
}
}
}
return ret;
}
int FlatMicroBlockReader::dump(const int64_t)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("not inited", K(ret));
} else {
const ObStoreRow *row = NULL;
dump_micro_header(micro_block_header_);
PrintHelper::print_dump_title("Micro Block", micro_block_.micro_id_, 1);
PrintHelper::print_dump_title("Total Rows", count(), 1);
for (int64_t i = 0; OB_SUCC(ret) && i < count(); ++i) {
if (OB_FAIL(set_index(i))) {
LOG_WARN("failed to set index", K(ret), K(i));
} else if (OB_FAIL(get_value(row))) {
LOG_WARN("failed to get micro block", K(ret));
} else if (OB_ISNULL(row)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("row is NULL", K(ret));
} else {
PrintHelper::print_row_title(false /*use csv*/, row, i);
dump_row(row);
}
}
}
return ret;
}
void FlatMicroBlockReader::reset()
{
micro_block_.reset();
micro_block_header_ = NULL;
index_buffer_ = NULL;
curr_row_index_ = 0;
curr_row_.row_val_.count_ = 0;
is_inited_ = false;
}
void FlatMicroBlockReader::dump_micro_header(const ObMicroBlockHeader *micro_block_header)
{
PrintHelper::print_dump_title("Micro Header");
PrintHelper::print_dump_line("header_size", micro_block_header->header_size_);
PrintHelper::print_dump_line("version", micro_block_header->version_);
PrintHelper::print_dump_line("magic", micro_block_header->magic_);
PrintHelper::print_dump_line("attr", micro_block_header->attr_);
PrintHelper::print_dump_line("column_count", micro_block_header->column_count_);
PrintHelper::print_dump_line("row_index_offset", micro_block_header->row_index_offset_);
PrintHelper::print_dump_line("row_count", micro_block_header->row_count_);
PrintHelper::print_end_line();
}
int ObDumpsstPartitionImage::init()
{
int ret = OB_SUCCESS;
if (OB_FAIL(SLOGGER.register_redo_module(OB_REDO_LOG_PARTITION, this))) {
STORAGE_LOG(WARN, "failed to register_redo_module", K(ret));
}
return ret;
}
ObDumpsstPartitionImage &ObDumpsstPartitionImage::get_instance()
{
static ObDumpsstPartitionImage pi;
return pi;
}
}
}

View File

@ -0,0 +1,357 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#include <unistd.h>
#include "common/rowkey/ob_rowkey.h"
#include "storage/ob_i_store.h"
#include "storage/ob_partition_store.h"
#include "storage/blocksstable/ob_column_map.h"
#include "storage/blocksstable/ob_block_sstable_struct.h"
#include "storage/blocksstable/ob_macro_block_common_header.h"
#include "storage/blocksstable/slog/ob_base_storage_logger.h"
#include "ob_admin_dumpsst_print_helper.h"
#define ALLOC_OBJECT_1(allocator, obj, T, ...) \
do \
{ \
obj = NULL; \
void *tmp = reinterpret_cast<void *>(allocator.alloc(sizeof(T))); \
if (NULL == tmp) \
{ \
_OB_LOG(WARN, "alloc mem for %s", #T); \
} \
else \
{ \
obj = new(tmp) T( __VA_ARGS__ ); \
} \
} \
while (0)
namespace oceanbase
{
namespace tools
{
int parse_string(const char* src, const char del, const char* dst[], int64_t& size);
int parse_table_key(const char *str, storage::ObITable::TableKey &table_key);
int parse_partition_key(const char *str, ObPartitionKey &pkey);
int parse_version_range(const char *str, ObVersionRange &version_range);
int parse_log_ts_range(const char *str, ObLogTsRange &log_ts_range);
struct DataBlock
{
DataBlock() : data_(NULL), size_(0) {}
const char *data_;
int64_t size_;
void reset()
{
data_ = NULL;
size_ = 0;
}
};
struct MacroBlock : public DataBlock
{
MacroBlock() : DataBlock(), macro_id_(-1), common_header_() {}
int setup(const char *data, const int64_t size, const int64_t macro_id);
bool is_valid() const
{
return macro_id_ >= 0
&& data_ != NULL
&& size_ > 0
&& common_header_.is_valid();
}
void reset()
{
macro_id_ = -1;
common_header_.reset();
DataBlock::reset();
}
TO_STRING_KV(KP_(data),
K_(size),
K_(macro_id));
int64_t macro_id_;
blocksstable::ObMacroBlockCommonHeader common_header_;
};
struct MicroBlock : public DataBlock
{
MicroBlock() : DataBlock(),
row_store_type_(-1),
micro_id_(-1),
column_map_(NULL),
column_id_list_(NULL),
column_cnt_(0)
{}
int setup(const char *data, const int64_t size, const int64_t micro_id,
const blocksstable::ObColumnMap *column_map, const int64_t row_store_type,
const uint16_t *column_id_list, const int64_t column_cnt);
void reset()
{
row_store_type_ = -1;
column_map_ = NULL;
micro_id_ = -1;
column_cnt_ = 0;
column_id_list_ = NULL;
DataBlock::reset();
}
bool is_valid() const
{
return data_ != NULL
&& size_ > 0
&& micro_id_ >= 0
&& column_map_ != NULL
&& column_id_list_ != NULL
&& column_cnt_ > 0;
}
TO_STRING_KV(KP_(data),
K_(size),
K_(micro_id),
K_(row_store_type),
K_(column_cnt));
int64_t row_store_type_;
int64_t micro_id_;
const blocksstable::ObColumnMap *column_map_;
const uint16_t *column_id_list_;
int64_t column_cnt_;
};
template <typename T>
class DataBlockReader
{
public:
virtual int64_t count() const = 0;
virtual int set_index(const int64_t index) = 0;
virtual int64_t get_index() const = 0;
virtual int get_value(const T *&value) = 0;
virtual int dump(const int64_t index) = 0;
public:
static void dump_common_header(const blocksstable::ObMacroBlockCommonHeader *common_header);
static void dump_sstable_header(const blocksstable::ObSSTableMacroBlockHeader *sstable_header);
static void dump_linked_header(const blocksstable::ObLinkedMacroBlockHeader *linked_header);
static void dump_row(const storage::ObStoreRow *row, const bool use_csv = false);
};
template <typename T>
void DataBlockReader<T>::dump_common_header(const blocksstable::ObMacroBlockCommonHeader *common_header)
{
PrintHelper::print_dump_title("Common Header");
fprintf(stdout, "%s\n", to_cstring(*common_header));
PrintHelper::print_end_line();
}
template <typename T>
void DataBlockReader<T>::dump_sstable_header(const blocksstable::ObSSTableMacroBlockHeader *sstable_header)
{
PrintHelper::print_dump_title("SSTable Header");
PrintHelper::print_dump_line("header_size", sstable_header->header_size_);
PrintHelper::print_dump_line("version", sstable_header->version_);
PrintHelper::print_dump_line("magic", sstable_header->magic_);
PrintHelper::print_dump_line("attr", sstable_header->attr_);
PrintHelper::print_dump_line("table_id", sstable_header->table_id_);
PrintHelper::print_dump_line("data_version", sstable_header->data_version_);
PrintHelper::print_dump_line("column_count", sstable_header->column_count_);
PrintHelper::print_dump_line("rowkey_column_count", sstable_header->rowkey_column_count_);
PrintHelper::print_dump_line("row_store_type", sstable_header->row_store_type_);
PrintHelper::print_dump_line("row_count", sstable_header->row_count_);
PrintHelper::print_dump_line("occupy_size", sstable_header->occupy_size_);
PrintHelper::print_dump_line("micro_block_count", sstable_header->micro_block_count_);
PrintHelper::print_dump_line("micro_block_size", sstable_header->micro_block_size_);
PrintHelper::print_dump_line("micro_block_data_offset", sstable_header->micro_block_data_offset_);
PrintHelper::print_dump_line("micro_block_index_offset", sstable_header->micro_block_index_offset_);
PrintHelper::print_dump_line("micro_block_index_size", sstable_header->micro_block_index_size_);
PrintHelper::print_dump_line("micro_block_endkey_offset", sstable_header->micro_block_endkey_offset_);
PrintHelper::print_dump_line("micro_block_endkey_size", sstable_header->micro_block_endkey_size_);
PrintHelper::print_dump_line("data_checksum", sstable_header->data_checksum_);
PrintHelper::print_dump_line("compressor_name", sstable_header->compressor_name_);
PrintHelper::print_dump_line("data_seq", sstable_header->data_seq_);
PrintHelper::print_dump_line("partition_id", sstable_header->partition_id_);
PrintHelper::print_end_line();
}
template <typename T>
void DataBlockReader<T>::dump_linked_header(const blocksstable::ObLinkedMacroBlockHeader *linked_header)
{
PrintHelper::print_dump_title("Lined Header");
PrintHelper::print_dump_line("header_size", linked_header->header_size_);
PrintHelper::print_dump_line("version", linked_header->version_);
PrintHelper::print_dump_line("magic", linked_header->magic_);
PrintHelper::print_dump_line("attr", linked_header->attr_);
PrintHelper::print_dump_line("meta_data_offset", linked_header->meta_data_offset_);
PrintHelper::print_dump_line("meta_data_count", linked_header->meta_data_count_);
PrintHelper::print_dump_line("previous_block_index", linked_header->previous_block_index_);
PrintHelper::print_dump_line("total_previous_count", linked_header->total_previous_count_);
PrintHelper::print_dump_line("user_data1", linked_header->user_data1_);
PrintHelper::print_dump_line("user_data1", linked_header->user_data2_);
PrintHelper::print_end_line();
}
template <typename T>
void DataBlockReader<T>::dump_row(const storage::ObStoreRow *row, const bool use_csv)
{
for (int64_t i = 0; i < row->row_val_.count_; ++i) {
common::ObObj &cell = row->row_val_.cells_[i];
PrintHelper::print_cell(cell, use_csv);
}
fprintf(stderr, "\n");
}
class MacroBlockReader : public DataBlockReader<MicroBlock>
{
public:
MacroBlockReader() : sstable_header_(NULL),
macro_meta_(NULL),
column_id_list_(NULL),
column_type_list_(NULL),
column_checksum_(NULL),
curr_micro_block_id_(-1),
cur_mib_rh_(NULL),
is_inited_(false)
{}
virtual ~MacroBlockReader() {};
int init(const char *data, const int64_t size, const int64_t macro_id, const blocksstable::ObMacroBlockMeta *macro_meta);
virtual int64_t count() const override;
virtual int set_index(const int64_t index) override;
virtual int64_t get_index() const override;
virtual int get_value(const MicroBlock *&value) override;
virtual int dump(const int64_t micro_id) override;
void reset();
private:
int parse_micro_block_index();
void dump_micro_index(const ObRowkey& endkey, const blocksstable::ObPosition& datapos,
const blocksstable::ObPosition& keypos, const bool mark_deletion, int32_t num);
int parse_micro_block_key(const char* buf, int64_t& pos, int64_t row_end_pos,
const ObObjMeta* type_list, ObObj* objs, int32_t rowkey_count);
void parse_one_micro_block_index(const char* index_ptr, const int64_t idx,
blocksstable::ObPosition& datapos, blocksstable::ObPosition& keypos);
int build_column_map();
int set_current_micro_block();
int dump_header();
int get_micro_block_payload_buffer(const char* compressor_name,
const char* buf,
const int64_t size,
const char*& out,
int64_t& outsize,
const blocksstable::ObRecordHeaderV3 *&rh);
private:
static const int64_t ALL_MINOR_INDEX = -1;
MacroBlock macro_block_;
const blocksstable::ObSSTableMacroBlockHeader *sstable_header_;
const blocksstable::ObMacroBlockMeta *macro_meta_;
const uint16_t * column_id_list_;
const common::ObObjMeta *column_type_list_;
const int64_t *column_checksum_;
int64_t curr_micro_block_id_;
MicroBlock curr_micro_block_;
common::ObSEArray<blocksstable::ObPosition, 100> micro_index_pos_;
common::ObSEArray<blocksstable::ObPosition, 100> micro_index_keys_;
common::ObSEArray<bool, 100> micro_mark_deletions_;
blocksstable::ObColumnMap column_map_;
const blocksstable::ObRecordHeaderV3 *cur_mib_rh_;
common::ObArenaAllocator allocator_;
bool is_inited_;
};
class MicroBlockIReader : public DataBlockReader<storage::ObStoreRow>
{
public:
virtual int64_t count() const = 0;
virtual int set_index(const int64_t index) = 0;
virtual int64_t get_index() const = 0;
virtual int get_value(const storage::ObStoreRow *&value) = 0;
virtual int dump(const int64_t index) = 0;
virtual int init(const MicroBlock &micro_block) = 0;
};
class FlatMicroBlockReader : public MicroBlockIReader
{
public:
FlatMicroBlockReader() : micro_block_header_(NULL),
index_buffer_(NULL),
curr_row_index_(0),
is_inited_(false)
{}
virtual int init(const MicroBlock &micro_block) override;
virtual int64_t count() const override;
virtual int set_index(const int64_t index) override;
virtual int64_t get_index() const override;
virtual int get_value(const storage::ObStoreRow *&value) override;
virtual int dump(const int64_t index) override;
void reset();
private:
static void dump_micro_header(const blocksstable::ObMicroBlockHeader* micro_block_header);
private:
MicroBlock micro_block_;
const blocksstable::ObMicroBlockHeader *micro_block_header_;
const int32_t *index_buffer_;
blocksstable::ObFlatRowReader reader_;
int64_t curr_row_index_;
common::ObObj columns_[common::OB_MAX_COLUMN_NUMBER];
storage::ObStoreRow curr_row_;
common::ObArenaAllocator allocator_;
bool is_inited_;
};
class MicroBlockReader : public DataBlockReader<storage::ObStoreRow>
{
public:
MicroBlockReader() : micro_reader_(NULL), is_inited_(false) {}
int init(const MicroBlock &micro_block);
virtual int64_t count() const override;
virtual int set_index(const int64_t index) override;
virtual int64_t get_index() const override;
virtual int get_value(const storage::ObStoreRow *&value) override;
virtual int dump(const int64_t index) override;
void reset();
private:
MicroBlock micro_block_;
MicroBlockIReader *micro_reader_;
FlatMicroBlockReader flat_micro_reader_;
bool is_inited_;
};
class ObDumpsstPartition
{
public:
int deserialize(const char *buf, const int64_t buf_len, int64_t &pos);
int get_latest_store();
private:
storage::ObPartitionStore store_;
};
class ObDumpsstPartitionImage : public blocksstable::ObIRedoModule
{
public:
static ObDumpsstPartitionImage &get_instance();
int init();
virtual int replay(const blocksstable::ObRedoModuleReplayParam &param) override
{
UNUSEDx(param);
return common::OB_SUCCESS;
}
virtual int parse(
const int64_t subcmd,
const char *buf,
const int64_t len,
FILE *stream) override
{
UNUSEDx(subcmd, buf, len, stream);
return common::OB_SUCCESS;
}
private:
ObDumpsstPartitionImage() {}
~ObDumpsstPartitionImage() {}
};
}
}

View File

@ -18,6 +18,11 @@
#include "ob_admin_executor.h"
#include "clog_tool/ob_admin_clog_v2_executor.h"
#include "usec_tool/ob_admin_usec_executor.h"
#include "slog_tool/ob_admin_slog_executor.h"
#include "dumpsst/ob_admin_dumpsst_executor.h"
#include "dumpsst/ob_admin_cmp_micro_executor.h"
#include "archive_tool/ob_admin_log_archive_executor.h"
#include "backup_tool/ob_admin_dump_backup_data_executor.h"
using namespace oceanbase::common;
using namespace oceanbase::tools;
@ -27,7 +32,11 @@ void print_usage()
fprintf(stderr,
"\nUSAGE:\n"
" ob_admin clog_tool\n"
" ob_admin usec_tool\n");
" ob_admin usec_tool\n"
" ob_admin slog_tool\n"
" ob_admin dumpsst\n"
" ob_admin archive_tool\n"
" ob_admin dump_backup\n");
}
int main(int argc, char *argv[])
@ -51,6 +60,16 @@ int main(int argc, char *argv[])
executor = new ObAdminClogV2Executor();
} else if (0 == strcmp("usec_tool", argv[1])) {
executor = new ObAdminUsecExecutor();
} else if (0 == strcmp("slog_tool", argv[1])) {
executor = new ObAdminSlogExecutor();
} else if (0 == strcmp("dumpsst", argv[1])) {
executor = new ObAdminDumpsstExecutor();
} else if (0 == strcmp("cmp_micro", argv[1])) {
executor = new ObAdminCmpMicroExecutor();
} else if (0 == strcmp("archive_tool", argv[1])) {
executor = new ObAdminLogArchiveExecutor();
} else if (0 == strcmp("dump_backup", argv[1])) {
executor = new ObAdminDumpBackupDataExecutor();
} else {
print_usage();
}

View File

@ -0,0 +1,122 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#define USING_LOG_PREFIX COMMON
#include "ob_admin_slog_executor.h"
#include "storage/blocksstable/slog/ob_base_storage_logger.h"
#include "storage/blocksstable/ob_macro_block_meta_mgr.h"
#include "storage/ob_partition_service.h"
#include "storage/ob_table_mgr.h"
#include "storage/ob_tenant_config_mgr.h"
#include "storage/ob_tenant_file_mgr.h"
#include "../dumpsst/ob_admin_dumpsst_utils.h"
using namespace oceanbase::share;
using namespace oceanbase::common;
using namespace oceanbase::blocksstable;
using namespace oceanbase::storage;
namespace oceanbase
{
namespace tools
{
ObAdminSlogExecutor::ObAdminSlogExecutor()
: log_dir_(NULL),
log_file_id_(0)
{
}
int ObAdminSlogExecutor::execute(int argc, char *argv[])
{
int ret = OB_SUCCESS;
reset();
ObPartitionMetaRedoModule partition_module;
ObBaseFileMgr file_mgr;
if (OB_FAIL(parse_cmd(argc - 1, argv + 1))) {
LOG_WARN("fail to parse cmd", K(ret));
} else {
if (NULL != log_dir_ && log_file_id_ > 0) {
if (OB_FAIL(SLOGGER.register_redo_module(OB_REDO_LOG_PARTITION, &partition_module))) {
LOG_WARN("fail to register partition module", K(ret));
} else if (OB_FAIL(SLOGGER.register_redo_module(OB_REDO_LOG_MACROBLOCK,
&ObMacroBlockMetaMgr::get_instance()))) {
LOG_WARN("fail to register macro module", K(ret));
} else if (OB_FAIL(SLOGGER.register_redo_module(OB_REDO_LOG_TABLE_MGR,
&ObTableMgr::get_instance()))) {
LOG_WARN("fail to register table module", K(ret));
} else if (OB_FAIL(SLOGGER.register_redo_module(OB_REDO_LOG_TENANT_CONFIG,
&ObTenantConfigMgr::get_instance()))) {
LOG_WARN("fail to register tenant config module", K(ret));
} else if (OB_FAIL(SLOGGER.register_redo_module(OB_REDO_LOG_TENANT_FILE, &file_mgr))) {
LOG_WARN("fail to register tenant file module", K(ret));
} else if (OB_FAIL(SLOGGER.parse_log(log_dir_, log_file_id_, stdout))) {
LOG_WARN("fail to parse log file", K(ret));
}
}
}
return ret;
}
void ObAdminSlogExecutor::reset()
{
log_dir_ = NULL;
log_file_id_ = 0;
}
int ObAdminSlogExecutor::parse_cmd(int argc, char *argv[])
{
int ret = OB_SUCCESS;
int opt = 0;
const char* opt_string = "hf:";
struct option longopts[] =
{{"help", 0, NULL, 'h' },
{"log_file_path", 1, NULL, 'f' }};
while ((opt = getopt_long(argc, argv, opt_string, longopts, NULL)) != -1) {
switch (opt) {
case 'h': {
print_usage();
break;
}
case 'f': {
//find the last separator from the right of log_file_pathַ
char *p = strrchr(optarg, '/');
if (NULL == p) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("log_file_path is not correct, ", K(ret), K(optarg));
} else {
//separate the log_dir and log_name by '\0'
*p = '\0';
log_dir_ = optarg;
log_file_id_ = atoi(p + 1);
}
break;
}
default: {
print_usage();
ret = OB_INVALID_ARGUMENT;
}
}
}
return ret;
}
void ObAdminSlogExecutor::print_usage()
{
fprintf(stderr, "\nUsage: ob_admin slog_tool -f log_file_name\n");
}
}
}

View File

@ -0,0 +1,39 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef OB_ADMIN_SLOG_EXECUTOR_H_
#define OB_ADMIN_SLOG_EXECUTOR_H_
#include "../ob_admin_executor.h"
namespace oceanbase
{
namespace tools
{
class ObAdminSlogExecutor : public ObAdminExecutor
{
public:
ObAdminSlogExecutor();
virtual ~ObAdminSlogExecutor() = default;
virtual int execute(int argc, char *argv[]);
void reset();
private:
int parse_cmd(int argc, char *argv[]);
void print_usage();
private:
char *log_dir_;
int64_t log_file_id_;
};
}
}
#endif /* OB_ADMIN_SLOG_EXECUTOR_H_ */