456 lines
13 KiB
C++
456 lines
13 KiB
C++
/**
|
|
* Copyright (c) 2021 OceanBase
|
|
* OceanBase CE is licensed under Mulan PubL v2.
|
|
* You can use this software according to the terms and conditions of the Mulan PubL v2.
|
|
* You may obtain a copy of Mulan PubL v2 at:
|
|
* http://license.coscl.org.cn/MulanPubL-2.0
|
|
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
|
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
|
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
|
* See the Mulan PubL v2 for more details.
|
|
*/
|
|
|
|
#define USING_LOG_PREFIX OBLOG_TAILF
|
|
|
|
#include "oblog_main.h"
|
|
#include "ob_log_instance.h" // ObLogInstance
|
|
|
|
#include <stdio.h> // fprintf
|
|
#include <getopt.h> // getopt_long
|
|
#include <stdlib.h> // strtoull
|
|
|
|
#define LOG_STD(str, ...) \
|
|
do { \
|
|
fprintf(stderr, str, ##__VA_ARGS__); \
|
|
} while (0)
|
|
|
|
using namespace oceanbase::common;
|
|
namespace oceanbase
|
|
{
|
|
namespace liboblog
|
|
{
|
|
|
|
ObLogMain &ObLogMain::get_instance()
|
|
{
|
|
static ObLogMain oblog_main;
|
|
return oblog_main;
|
|
}
|
|
|
|
ObLogMain::ObLogMain() : inited_(false),
|
|
oblog_(NULL),
|
|
oblog_factory_(),
|
|
br_printer_(),
|
|
only_print_hex_(false),
|
|
print_hex_(false),
|
|
print_lob_md5_(false),
|
|
use_daemon_(false),
|
|
data_file_(NULL),
|
|
heartbeat_file_(NULL),
|
|
run_time_us_(-1),
|
|
config_file_(NULL),
|
|
print_console_(false),
|
|
verify_mode_(false),
|
|
enable_reentrant_(false),
|
|
output_br_detail_(false),
|
|
start_timestamp_usec_(0),
|
|
tenant_id_(OB_INVALID_TENANT_ID),
|
|
tg_match_pattern_(NULL),
|
|
last_heartbeat_timestamp_micro_sec_(0),
|
|
stop_flag_(true)
|
|
{
|
|
}
|
|
|
|
ObLogMain::~ObLogMain()
|
|
{
|
|
destroy();
|
|
}
|
|
|
|
int ObLogMain::init(int argc, char **argv)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
|
|
if (OB_FAIL(parse_args_(argc, argv))) {
|
|
if (OB_IN_STOP_STATE != ret) {
|
|
LOG_ERROR("parse arguments fail", K(ret), K(argc));
|
|
}
|
|
} else if (! check_args_()) {
|
|
LOG_ERROR("check arguments fail");
|
|
ret = OB_INVALID_ARGUMENT;
|
|
} else if (OB_FAIL(br_printer_.init(data_file_, heartbeat_file_, print_console_, only_print_hex_, print_hex_,
|
|
print_lob_md5_, verify_mode_, output_br_detail_))) {
|
|
LOG_ERROR("init binlog record printer fail", K(ret), K(data_file_), K(heartbeat_file_),
|
|
K(print_console_), K(only_print_hex_), K(print_hex_), K(print_lob_md5_), K(verify_mode_), K_(output_br_detail));
|
|
} else {
|
|
stop_flag_ = true;
|
|
inited_ = true;
|
|
last_heartbeat_timestamp_micro_sec_ = start_timestamp_usec_;
|
|
}
|
|
|
|
return ret;
|
|
}
|
|
|
|
void ObLogMain::destroy()
|
|
{
|
|
stop();
|
|
|
|
inited_ = false;
|
|
oblog_ = NULL;
|
|
only_print_hex_ = false;
|
|
print_hex_ = false;
|
|
print_lob_md5_ = false;
|
|
use_daemon_ = false;
|
|
data_file_ = NULL;
|
|
heartbeat_file_ = NULL;
|
|
run_time_us_ = 0;
|
|
config_file_ = NULL;
|
|
print_console_ = false;
|
|
verify_mode_ = false;
|
|
enable_reentrant_ = false;
|
|
start_timestamp_usec_ = 0;
|
|
tenant_id_ = OB_INVALID_TENANT_ID;
|
|
tg_match_pattern_ = NULL;
|
|
last_heartbeat_timestamp_micro_sec_ = 0;
|
|
stop_flag_ = true;
|
|
output_br_detail_ = false;
|
|
br_printer_.destroy();
|
|
}
|
|
|
|
int ObLogMain::parse_args_(int argc, char **argv)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
|
|
// option variables
|
|
int opt = -1;
|
|
const char *opt_string = "ivdD:f:hH:oVt:rR:OxmT:P";
|
|
struct option long_opts[] =
|
|
{
|
|
{"use_daemon", 0, NULL, 'd'},
|
|
{"data_file", 1, NULL, 'D'},
|
|
{"config_file",1,NULL,'f'},
|
|
{"help", 0, NULL, 'h'},
|
|
{"heartbeat_file", 1, NULL, 'H'},
|
|
{"print_console",0,NULL, 'o'},
|
|
{"verify_mode", 0, NULL, 'V'},
|
|
{"start_timestamp", 1, NULL, 't'}, // t: Represents a start-up timestamp in seconds
|
|
{"start_timestamp_usec", 1, NULL, 'T'}, // T: Represents a start-up timestamp in microsecond units
|
|
{"data_start_schema_version", 1, NULL, 's'},
|
|
{"enable_reentrant", 0, NULL, 'r'},
|
|
{"run_time_sec", 1, NULL, 'R'},
|
|
{"only_print_hex", 0, NULL, 'O'},
|
|
{"print_hex", 0, NULL, 'x'},
|
|
{"print_lob_md5", 0, NULL, 'm'},
|
|
{"version", 0, NULL, 'v'},
|
|
{"verify_begin_trans_id", 0, NULL, 'P'},
|
|
{"output_br_detail", 0, NULL, 'i'},
|
|
{0, 0, 0, 0}
|
|
};
|
|
|
|
if (argc <= 1) {
|
|
print_usage(argv[0]);
|
|
ret = OB_IN_STOP_STATE;
|
|
}
|
|
|
|
// Parse command line
|
|
while (OB_SUCCESS == ret && (opt = getopt_long(argc, argv, opt_string, long_opts, NULL)) != -1) {
|
|
switch (opt) {
|
|
case 'f': {
|
|
config_file_ = optarg;
|
|
break;
|
|
}
|
|
|
|
case 'd': {
|
|
use_daemon_ = true;
|
|
break;
|
|
}
|
|
|
|
case 'D': {
|
|
data_file_ = optarg;
|
|
break;
|
|
}
|
|
|
|
case 'h': {
|
|
print_usage(argv[0]);
|
|
ret = OB_IN_STOP_STATE;
|
|
break;
|
|
}
|
|
|
|
case 'H': {
|
|
heartbeat_file_ = optarg;
|
|
break;
|
|
}
|
|
|
|
case 'o': {
|
|
print_console_ = true;
|
|
break;
|
|
}
|
|
|
|
case 'V': {
|
|
verify_mode_ = true;
|
|
break;
|
|
}
|
|
|
|
case 't': {
|
|
start_timestamp_usec_ = strtoll(optarg, NULL, 10) * 1000000L;
|
|
break;
|
|
}
|
|
|
|
case 'T': {
|
|
start_timestamp_usec_ = strtoll(optarg, NULL, 10);
|
|
break;
|
|
}
|
|
|
|
case 'r': {
|
|
enable_reentrant_ = true;
|
|
break;
|
|
}
|
|
|
|
case 'R': {
|
|
run_time_us_ = strtoll(optarg, NULL, 10) * 1000000;
|
|
LOG_STD("RUN_TIME: %ld seconds\n", run_time_us_ / 1000000);
|
|
break;
|
|
}
|
|
|
|
case 'O': {
|
|
only_print_hex_ = true;
|
|
break;
|
|
}
|
|
|
|
case 'x': {
|
|
print_hex_ = true;
|
|
break;
|
|
}
|
|
|
|
case 'm': {
|
|
print_lob_md5_ = true;
|
|
break;
|
|
}
|
|
|
|
case 'i': {
|
|
// output detail info of binlog record, default off
|
|
output_br_detail_ = true;
|
|
break;
|
|
}
|
|
|
|
case 'v': {
|
|
ObLogInstance::print_version();
|
|
ret = OB_IN_STOP_STATE;
|
|
break;
|
|
}
|
|
case 'P': {
|
|
// Verify that the begin trans_id function does not fall back and is turned on by default
|
|
LOG_STD("verify_begin_trans_id\n");
|
|
break;
|
|
}
|
|
|
|
default:
|
|
ret = OB_ERROR;
|
|
LOG_ERROR("unknown parameters", K(opt), K(opt_string));
|
|
break;
|
|
} // end switch
|
|
} // end while
|
|
|
|
return ret;
|
|
}
|
|
|
|
void ObLogMain::print_usage(const char *prog_name)
|
|
{
|
|
LOG_STD("USAGE: %s -f config_file_path\n\n"
|
|
" -v, --version print version\n"
|
|
" -d, --use_daemon start as daemon, default no daemon\n"
|
|
" -D, --data_file data file used to store data\n"
|
|
" -f, --config_file configuration file\n"
|
|
" -h, --help display this help\n"
|
|
" -H, --heartbeat_file heartbeat file used to store heartbeat data\n"
|
|
" -o, --print_console output result to stderr or stdout, default not output\n"
|
|
" -V, --verify_mode start verify mode\n"
|
|
" -t, --start_timestamp start timestamp in second, default current timestamp\n"
|
|
" -T, --start_timestamp_usec start timestamp in micro second, default current timestamp\n"
|
|
" -r, --enable_reentrant enable reentrant after stop, default disable\n"
|
|
" -R, --run_time_sec run time in seconds, default -1, means to run forever\n"
|
|
" -x, --print_hex print hex for newcolumn, to check implicit char\n"
|
|
" -m, --print_lob_md5 print md5 info for LOB data\n"
|
|
" -i, --output_br_detail output immutable detail info of binlog record, default not output\n"
|
|
|
|
"\neg: %s -f liboblog.conf\n",
|
|
prog_name, prog_name);
|
|
}
|
|
|
|
bool ObLogMain::check_args_()
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
int nochdir = 1;
|
|
int noclose = 0;
|
|
|
|
if (NULL == config_file_) {
|
|
LOG_ERROR("config file is missing");
|
|
ret = OB_INVALID_ARGUMENT;
|
|
} else if (use_daemon_ && daemon(nochdir, noclose) < 0) {
|
|
LOG_ERROR("create daemon process error", K(errno), KERRMSG);
|
|
ret = OB_ERR_UNEXPECTED;
|
|
}
|
|
|
|
return OB_SUCCESS == ret;
|
|
}
|
|
|
|
int ObLogMain::start()
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
if (! inited_) {
|
|
ret = OB_NOT_INIT;
|
|
} else if (NULL != oblog_) {
|
|
LOG_ERROR("oblog has started");
|
|
ret = OB_NOT_SUPPORTED;
|
|
} else if (stop_flag_) {
|
|
stop_flag_ = false;
|
|
|
|
if (NULL == (oblog_ = oblog_factory_.construct_oblog())) {
|
|
LOG_ERROR("construct oblog fail");
|
|
ret = OB_INIT_FAIL;
|
|
} else {
|
|
ObLogInstance *instance = (ObLogInstance *)oblog_;
|
|
// Disable redirected output
|
|
instance->set_disable_redirect_log(true);
|
|
|
|
if (OB_FAIL(instance->init_with_start_tstamp_usec(config_file_, start_timestamp_usec_, handle_error))) {
|
|
LOG_ERROR("init oblog fail", K(ret), K_(config_file), K_(start_timestamp_usec), KP(handle_error));
|
|
} else {
|
|
// do nothing
|
|
}
|
|
|
|
if (OB_SUCC(ret)) {
|
|
if (OB_FAIL(oblog_->launch())) {
|
|
LOG_ERROR("launch oblog fail", K(ret));
|
|
} else {
|
|
OB_LOGGER.set_log_level(instance->get_log_level());
|
|
OB_LOGGER.set_file_name(instance->get_log_file(), true, false);
|
|
LOG_INFO("start oblog success");
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
return ret;
|
|
}
|
|
|
|
void ObLogMain::stop()
|
|
{
|
|
stop_flag_ = true;
|
|
|
|
if (NULL != oblog_) {
|
|
oblog_->stop();
|
|
oblog_->destroy();
|
|
oblog_factory_.deconstruct(oblog_);
|
|
oblog_ = NULL;
|
|
}
|
|
}
|
|
|
|
void ObLogMain::run()
|
|
{
|
|
if (inited_ && NULL != oblog_) {
|
|
int ret = OB_SUCCESS;
|
|
int64_t end_time = ::oceanbase::common::ObTimeUtility::current_time() + run_time_us_;
|
|
|
|
while (OB_SUCCESS == ret && ! stop_flag_) {
|
|
ILogRecord *br = NULL;
|
|
ret = oblog_->next_record(&br, NEXT_RECORD_TIMEOUT);
|
|
|
|
if (OB_SUCC(ret)) {
|
|
if (OB_FAIL(verify_record_info_(br))) {
|
|
LOG_ERROR("verify_record_info_ fail", K(ret), K(br));
|
|
}
|
|
// output binlog record
|
|
else if (OB_FAIL(br_printer_.print_binlog_record(br))) {
|
|
LOG_ERROR("print_binlog_record fail", K(ret));
|
|
} else {
|
|
oblog_->release_record(br);
|
|
br = NULL;
|
|
}
|
|
} else if (OB_TIMEOUT == ret) {
|
|
int64_t left_time = end_time - ::oceanbase::common::ObTimeUtility::current_time();
|
|
if (run_time_us_ > 0 && left_time <= 0) {
|
|
ret = OB_TIMEOUT;
|
|
} else {
|
|
ret = OB_SUCCESS;
|
|
}
|
|
} else if (OB_ITER_END == ret) {
|
|
LOG_INFO("iterate BinlogRecord to the END");
|
|
stop_flag_ = true;
|
|
ret = OB_SUCCESS;
|
|
} else if (OB_IN_STOP_STATE == ret) {
|
|
stop_flag_ = true;
|
|
ret = OB_SUCCESS;
|
|
} else {
|
|
LOG_ERROR("next_record fail", K(ret));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
void ObLogMain::handle_error(const ObLogError &err)
|
|
{
|
|
LOG_INFO("stop oblog on error", "level", err.level_, "errno", err.errno_, "errmsg", err.errmsg_);
|
|
ObLogMain::get_instance().mark_stop_flag(true);
|
|
}
|
|
|
|
int ObLogMain::verify_record_info_(ILogRecord *br)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
static bool is_first_br = true;
|
|
ObLogBR *oblog_br = NULL;
|
|
|
|
if (OB_UNLIKELY(! inited_)) {
|
|
LOG_ERROR("ObLogMain has not inited");
|
|
ret = OB_NOT_INIT;
|
|
} else if (OB_ISNULL(br)) {
|
|
LOG_ERROR("br is null");
|
|
ret = OB_INVALID_ARGUMENT;
|
|
} else if (OB_ISNULL(oblog_br = reinterpret_cast<ObLogBR *>(br->getUserData()))) {
|
|
LOG_ERROR("get user data fail", K(br), K(oblog_br));
|
|
ret = OB_INVALID_ARGUMENT;
|
|
} else {
|
|
// heartbeat, updtae last_heartbeat_timestamp_micro_sec_
|
|
if (HEARTBEAT == br->recordType()) {
|
|
int64_t timestamp_usec = OB_INVALID_TIMESTAMP;
|
|
if (is_first_br) {
|
|
// oblog_tailf -f $CONFIG -t 0 means start at current time
|
|
// The liboblog start timestamp is not available
|
|
// So the first BinlogRecord is obtained based on the checkpoint
|
|
timestamp_usec = br->getCheckpoint1() * 1000000 + br->getCheckpoint2();
|
|
is_first_br = false;
|
|
} else {
|
|
timestamp_usec = br->getTimestamp() * 1000000 + br->getRecordUsec();
|
|
}
|
|
last_heartbeat_timestamp_micro_sec_ = std::max(timestamp_usec, last_heartbeat_timestamp_micro_sec_);
|
|
}
|
|
|
|
// Calibration timestamp and checkpoint
|
|
int64_t precise_timestamp = oblog_br->get_precise_timestamp();
|
|
int64_t timestamp_sec = precise_timestamp / 1000000;
|
|
int64_t timestamp_usec = precise_timestamp % 1000000;
|
|
int64_t expect_checkpoint1 = last_heartbeat_timestamp_micro_sec_ / 1000000;
|
|
int64_t expect_checkpoint2 = last_heartbeat_timestamp_micro_sec_ % 1000000;
|
|
|
|
if (OB_UNLIKELY(timestamp_sec != br->getTimestamp())
|
|
|| OB_UNLIKELY(timestamp_usec != br->getRecordUsec())) {
|
|
LOG_ERROR("timestamp is not right", K(precise_timestamp), "br_sec", br->getTimestamp(),
|
|
"br_usec", br->getRecordUsec());
|
|
ret = OB_ERR_UNEXPECTED;
|
|
} else if (OB_UNLIKELY(expect_checkpoint1 != br->getCheckpoint1())
|
|
|| OB_UNLIKELY(expect_checkpoint2 != br->getCheckpoint2())) {
|
|
LOG_ERROR("checkpoint is not right", K(br), K(last_heartbeat_timestamp_micro_sec_),
|
|
K(expect_checkpoint1), "br_checkpoint1", br->getCheckpoint1(),
|
|
K(expect_checkpoint2), "br_checkpoint2", br->getCheckpoint2(),
|
|
"getTimestamp", br->getTimestamp(), "getRecordUsec", br->getRecordUsec(),
|
|
K(is_first_br));
|
|
ret = OB_ERR_UNEXPECTED;
|
|
} else {
|
|
// succ
|
|
}
|
|
}
|
|
|
|
return ret;
|
|
}
|
|
|
|
} // namespace liboblog
|
|
} // namespace oceanbase
|