[FEAT MERGE] merge the feature OBServer supports specified IP start to master
This commit is contained in:
@ -520,8 +520,8 @@ int TableAccessor::get_self_zone_name(ObStringHolder &zone_name_holder)
|
||||
int64_t pos = 0;
|
||||
const char *columns[1] = {"zone"};
|
||||
char where_condition[STACK_BUFFER_SIZE] = {0};
|
||||
char ip_string[16] = {0};
|
||||
if (!GCTX.self_addr().ip_to_string(ip_string, 16)) {
|
||||
char ip_string[INET6_ADDRSTRLEN] = {0};
|
||||
if (!GCTX.self_addr().ip_to_string(ip_string, sizeof(ip_string))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
COORDINATOR_LOG_(WARN, "ip to string failed");
|
||||
} else if (CLICK_FAIL(databuff_printf(where_condition, STACK_BUFFER_SIZE, pos, "where svr_ip='%s' and svr_port=%d", ip_string, GCTX.self_addr().get_port()))) {
|
||||
|
||||
@ -68,6 +68,7 @@ static void print_help()
|
||||
MPRINT(" -c,--cluster_id ID cluster id");
|
||||
MPRINT(" -d,--data_dir DIR OceanBase data directory");
|
||||
MPRINT(" -i,--devname DEV net dev interface");
|
||||
MPRINT(" -I,--local_ip ip of the current machine");
|
||||
MPRINT(" -o,--optstr OPTSTR extra options string");
|
||||
MPRINT(" -r,--rs_list RS_LIST root service list");
|
||||
MPRINT(" -l,--log_level LOG_LEVEL server log level");
|
||||
@ -163,6 +164,7 @@ static void get_opts_setting(
|
||||
{"scn", 'f', 1},
|
||||
{"version", 'V', 0},
|
||||
{"ipv6", '6', 0},
|
||||
{"local_ip", 'I', 1},
|
||||
};
|
||||
|
||||
size_t opts_cnt = sizeof(ob_opts) / sizeof(ob_opts[0]);
|
||||
@ -278,6 +280,12 @@ parse_short_opt(const int c, const char *value, ObServerOptions &opts)
|
||||
case '6':
|
||||
opts.use_ipv6_ = true;
|
||||
break;
|
||||
|
||||
case 'I':
|
||||
MPRINT("local_ip: %s", value);
|
||||
opts.local_ip_ = value;
|
||||
break;
|
||||
|
||||
case 'h':
|
||||
default:
|
||||
print_help();
|
||||
|
||||
@ -46,7 +46,8 @@ ObHeartBeatProcess::ObHeartBeatProcess(const ObGlobalContext &gctx,
|
||||
newest_lease_info_version_(0),
|
||||
gctx_(gctx),
|
||||
schema_updater_(schema_updater),
|
||||
lease_state_mgr_(lease_state_mgr)
|
||||
lease_state_mgr_(lease_state_mgr),
|
||||
server_id_persist_task_()
|
||||
{
|
||||
}
|
||||
|
||||
@ -173,6 +174,15 @@ int ObHeartBeatProcess::do_heartbeat_event(const ObLeaseResponse &lease_response
|
||||
"old_id", GCTX.server_id_,
|
||||
"new_id", lease_response.server_id_);
|
||||
GCTX.server_id_ = lease_response.server_id_;
|
||||
GCONF.server_id = lease_response.server_id_;
|
||||
const int64_t delay = 0;
|
||||
const bool repeat = false;
|
||||
if (OB_SUCCESS != (tmp_ret = TG_SCHEDULE(lib::TGDefIDs::CONFIG_MGR, server_id_persist_task_, delay, repeat))) {
|
||||
server_id_persist_task_.enable_need_retry_flag();
|
||||
LOG_WARN("schedule server_id persist task failed", K(tmp_ret));
|
||||
} else {
|
||||
server_id_persist_task_.disable_need_retry_flag();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -214,6 +224,14 @@ int ObHeartBeatProcess::do_heartbeat_event(const ObLeaseResponse &lease_response
|
||||
if (OB_SUCCESS != (tmp_ret = OTC_MGR.got_versions(lease_response.tenant_config_version_))) {
|
||||
LOG_WARN("tenant got versions failed", K(tmp_ret));
|
||||
}
|
||||
|
||||
if (server_id_persist_task_.is_need_retry()) {
|
||||
if (OB_SUCCESS != (tmp_ret = TG_SCHEDULE(lib::TGDefIDs::CONFIG_MGR, server_id_persist_task_, delay, repeat))) {
|
||||
LOG_WARN("schedule server_id persist task failed", K(tmp_ret));
|
||||
} else {
|
||||
server_id_persist_task_.disable_need_retry_flag();
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -299,5 +317,26 @@ void ObHeartBeatProcess::ObZoneLeaseInfoUpdateTask::runTimerTask()
|
||||
}
|
||||
}
|
||||
|
||||
void ObHeartBeatProcess::ObServerIdPersistTask::runTimerTask()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
bool need_retry = false;
|
||||
if (OB_NOT_NULL(GCTX.config_mgr_)) {
|
||||
if (OB_FAIL(GCTX.config_mgr_->dump2file())) {
|
||||
need_retry = true;
|
||||
LOG_WARN("dump server id to file failed", K(ret));
|
||||
}
|
||||
} else {
|
||||
need_retry = true;
|
||||
LOG_WARN("GCTX.config_mgr_ is NULL, observer may not init");
|
||||
}
|
||||
if (need_retry) {
|
||||
// retry server id persistence task in 1s later
|
||||
if (OB_FAIL(TG_SCHEDULE(lib::TGDefIDs::CONFIG_MGR, *this, 1000 * 1000L, false))) {
|
||||
LOG_WARN("Reschedule server id persistence task failed", K(ret));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}//end namespace observer
|
||||
}//end namespace oceanbase
|
||||
|
||||
@ -52,6 +52,19 @@ private:
|
||||
ObHeartBeatProcess &hb_process_;
|
||||
};
|
||||
|
||||
class ObServerIdPersistTask : public common::ObTimerTask
|
||||
{
|
||||
public:
|
||||
ObServerIdPersistTask() : is_need_retry_(false) {};
|
||||
virtual ~ObServerIdPersistTask() {};
|
||||
virtual void runTimerTask();
|
||||
bool is_need_retry() const { return ATOMIC_LOAD(&is_need_retry_); }
|
||||
void disable_need_retry_flag() { ATOMIC_STORE(&is_need_retry_, false); }
|
||||
void enable_need_retry_flag() { ATOMIC_STORE(&is_need_retry_, true); }
|
||||
private:
|
||||
bool is_need_retry_;
|
||||
};
|
||||
|
||||
int try_reload_config(const int64_t config_version);
|
||||
int try_reload_time_zone_info(const int64_t time_zone_info_version);
|
||||
private:
|
||||
@ -63,6 +76,7 @@ private:
|
||||
const ObGlobalContext &gctx_;
|
||||
ObServerSchemaUpdater &schema_updater_;
|
||||
ObLeaseStateMgr &lease_state_mgr_;
|
||||
ObServerIdPersistTask server_id_persist_task_;
|
||||
private:
|
||||
DISALLOW_COPY_AND_ASSIGN(ObHeartBeatProcess);
|
||||
};
|
||||
|
||||
@ -1415,11 +1415,16 @@ int ObServer::init_config()
|
||||
config_.mysql_port.set_version(start_time_);
|
||||
}
|
||||
|
||||
if (opts_.local_ip_ && strlen(opts_.local_ip_) > 0) {
|
||||
config_.local_ip.set_value(opts_.local_ip_);
|
||||
config_.local_ip.set_version(start_time_);
|
||||
}
|
||||
|
||||
if (opts_.devname_ && strlen(opts_.devname_) > 0) {
|
||||
config_.devname.set_value(opts_.devname_);
|
||||
config_.devname.set_version(start_time_);
|
||||
} else {
|
||||
if (!has_config_file) {
|
||||
if (!has_config_file && 0 == strlen(config_.local_ip)) {
|
||||
const char *devname = get_default_if();
|
||||
if (devname && '\0' != devname[0]) {
|
||||
LOG_INFO("guess interface name", K(devname));
|
||||
@ -1491,6 +1496,49 @@ int ObServer::init_config()
|
||||
|
||||
config_.print();
|
||||
|
||||
// local_ip is a critical parameter, if if is set, then verify it; otherwise, set it via devname.
|
||||
if (strlen(config_.local_ip) > 0) {
|
||||
char if_name[MAX_IFNAME_LENGTH] = { '\0' };
|
||||
if (0 != obsys::ObNetUtil::get_ifname_by_addr(config_.local_ip, if_name, sizeof(if_name))) {
|
||||
// if it is incorrect, then ObServer should not be started.
|
||||
ret = OB_ERR_OBSERVER_START;
|
||||
LOG_DBA_ERROR(OB_ERR_OBSERVER_START, "local_ip is not a valid IP for this machine, local_ip", config_.local_ip.get_value());
|
||||
} else {
|
||||
if (0 != strcmp(config_.devname, if_name)) {
|
||||
// this is done to ensure the consistency of local_ip and devname.
|
||||
LOG_DBA_WARN(OB_ITEM_NOT_MATCH, "the devname has been rewritten, and the new value comes from local_ip, old value",
|
||||
config_.devname.get_value(), "new value", if_name, "local_ip", config_.local_ip.get_value());
|
||||
}
|
||||
// unconditionally call set_value to ensure that devname is written to the configuration file.
|
||||
config_.devname.set_value(if_name);
|
||||
config_.devname.set_version(start_time_);
|
||||
}
|
||||
} else {
|
||||
if (config_.use_ipv6) {
|
||||
char ipv6[MAX_IP_ADDR_LENGTH] = { '\0' };
|
||||
if (0 != obsys::ObNetUtil::get_local_addr_ipv6(config_.devname, ipv6, sizeof(ipv6))) {
|
||||
ret = OB_ERROR;
|
||||
_LOG_ERROR("call get_local_addr_ipv6 failed, devname:%s, errno:%d.", config_.devname.get_value(), errno);
|
||||
} else {
|
||||
config_.local_ip.set_value(ipv6);
|
||||
config_.local_ip.set_version(start_time_);
|
||||
_LOG_INFO("set local_ip via devname, local_ip:%s, devname:%s.", ipv6, config_.devname.get_value());
|
||||
}
|
||||
} else {
|
||||
uint32_t ipv4_binary = obsys::ObNetUtil::get_local_addr_ipv4(config_.devname);
|
||||
char ipv4[INET_ADDRSTRLEN] = { '\0' };
|
||||
if (nullptr == inet_ntop(AF_INET, (void *)&ipv4_binary, ipv4, sizeof(ipv4))) {
|
||||
ret = OB_ERROR;
|
||||
_LOG_ERROR("call inet_ntop failed, devname:%s, ipv4_binary:0x%08x, errno:%d.",
|
||||
config_.devname.get_value(), ipv4_binary, errno);
|
||||
} else {
|
||||
config_.local_ip.set_value(ipv4);
|
||||
config_.local_ip.set_version(start_time_);
|
||||
_LOG_INFO("set local_ip via devname, local_ip:%s, devname:%s.", ipv4, config_.devname.get_value());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_FAIL(ret)) {
|
||||
// nop
|
||||
} else if (!is_arbitration_mode() && OB_FAIL(config_.strict_check_special())) {
|
||||
@ -1501,13 +1549,17 @@ int ObServer::init_config()
|
||||
LOG_ERROR("set running mode failed", KR(ret));
|
||||
} else {
|
||||
int32_t local_port = static_cast<int32_t>(config_.rpc_port);
|
||||
if (config_.use_ipv6) {
|
||||
if (strlen(config_.local_ip) > 0) {
|
||||
self_addr_.set_ip_addr(config_.local_ip, local_port);
|
||||
} else {
|
||||
if (config_.use_ipv6) {
|
||||
char ipv6[MAX_IP_ADDR_LENGTH] = { '\0' };
|
||||
obsys::ObNetUtil::get_local_addr_ipv6(config_.devname, ipv6, sizeof(ipv6));
|
||||
self_addr_.set_ip_addr(ipv6, local_port);
|
||||
} else {
|
||||
int32_t ipv4 = ntohl(obsys::ObNetUtil::get_local_addr_ipv4(config_.devname));
|
||||
self_addr_.set_ipv4_addr(ipv4, local_port);
|
||||
} else {
|
||||
int32_t ipv4 = ntohl(obsys::ObNetUtil::get_local_addr_ipv4(config_.devname));
|
||||
self_addr_.set_ipv4_addr(ipv4, local_port);
|
||||
}
|
||||
}
|
||||
|
||||
const char *syslog_file_info = ObServerUtils::build_syslog_file_info(self_addr_);
|
||||
@ -2135,6 +2187,7 @@ int ObServer::init_global_context()
|
||||
(void)gctx_.set_upgrade_stage(obrpc::OB_UPGRADE_STAGE_INVALID);
|
||||
|
||||
gctx_.flashback_scn_ = opts_.flashback_scn_;
|
||||
gctx_.server_id_ = config_.server_id;
|
||||
if ((PHY_FLASHBACK_MODE == gctx_.startup_mode_ || PHY_FLASHBACK_VERIFY_MODE == gctx_.startup_mode_)
|
||||
&& 0 >= gctx_.flashback_scn_) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
|
||||
@ -135,7 +135,8 @@ public:
|
||||
startup_mode_(NULL),
|
||||
log_level_(0),
|
||||
use_ipv6_(false),
|
||||
flashback_scn_(0)
|
||||
flashback_scn_(0),
|
||||
local_ip_(NULL)
|
||||
{
|
||||
}
|
||||
ObServerOptions(int rpc_port,
|
||||
@ -153,7 +154,8 @@ public:
|
||||
int8_t log_level,
|
||||
const char *mode,
|
||||
bool use_ipv6,
|
||||
int64_t flashback_scn)
|
||||
int64_t flashback_scn,
|
||||
const char *local_ip)
|
||||
{
|
||||
rpc_port_ = rpc_port;
|
||||
elect_port_ = elect_port;
|
||||
@ -171,6 +173,7 @@ public:
|
||||
log_level_ = log_level;
|
||||
use_ipv6_ = use_ipv6;
|
||||
flashback_scn_ = flashback_scn;
|
||||
local_ip_ = local_ip;
|
||||
}
|
||||
virtual ~ObServerOptions() {}
|
||||
|
||||
@ -190,6 +193,7 @@ public:
|
||||
int8_t log_level_;
|
||||
bool use_ipv6_;
|
||||
int64_t flashback_scn_;
|
||||
const char *local_ip_;
|
||||
};
|
||||
|
||||
enum ObServerMode {
|
||||
|
||||
@ -811,7 +811,7 @@ int ObIOCalibration::parse_calibration_table(ObIOAbility &io_ability)
|
||||
sqlclient::ObMySQLResult *result = nullptr;
|
||||
SMART_VAR(ObISQLClient::ReadResult, res) {
|
||||
ObSqlString sql_string;
|
||||
char ip_str[32] = { 0 };
|
||||
char ip_str[INET6_ADDRSTRLEN] = { 0 };
|
||||
const ObAddr &self_addr = OBSERVER.get_self();
|
||||
if (OB_UNLIKELY(!self_addr.ip_to_string(ip_str, sizeof(ip_str)))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
|
||||
@ -54,7 +54,7 @@ DEF_INT(mysql_port, OB_CLUSTER_PARAMETER, "2881", "(1024,65536)",
|
||||
"port number for mysql connection. Range: (1024, 65536) in integer",
|
||||
ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
|
||||
DEF_STR(devname, OB_CLUSTER_PARAMETER, "bond0", "name of network adapter",
|
||||
ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
|
||||
ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::READONLY));
|
||||
DEF_STR(zone, OB_CLUSTER_PARAMETER, "", "specifies the zone name",
|
||||
ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
|
||||
DEF_STR(ob_startup_mode, OB_CLUSTER_PARAMETER, "NORMAL", "specifies the observer startup mode",
|
||||
@ -1401,6 +1401,12 @@ DEF_BOOL(_enable_transaction_internal_routing, OB_TENANT_PARAMETER, "True",
|
||||
DEF_STR(_load_tde_encrypt_engine, OB_CLUSTER_PARAMETER, "NONE",
|
||||
"load the engine that meet the security classification requirement to encrypt data. default NONE",
|
||||
ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
|
||||
DEF_STR(local_ip, OB_CLUSTER_PARAMETER, "", "the IP address of the machine on which the ObServer will be installed",
|
||||
ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::READONLY));
|
||||
DEF_INT(server_id, OB_CLUSTER_PARAMETER, "0", "[1, 65536]",
|
||||
"the unique id that been assigned by rootservice for each observer in cluster, "
|
||||
"default: 0 (invalid id), Range: [1, 65536]",
|
||||
ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::READONLY));
|
||||
DEF_INT(_pipelined_table_function_memory_limit, OB_TENANT_PARAMETER, "524288000", "[1024,18446744073709551615]",
|
||||
"pipeline table function result set memory size limit. default 524288000 (500M), Range: [1024,18446744073709551615]",
|
||||
ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
|
||||
|
||||
Reference in New Issue
Block a user