Files
oceanbase/mittest/multi_replica/env/ob_simple_replica.cpp

445 lines
15 KiB
C++

/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#define private public
#define protected public
#include "storage/tx_storage/ob_ls_service.h"
#undef private
#undef protected
#include "ob_simple_replica.h"
#include "lib/allocator/ob_libeasy_mem_pool.h"
#include "ob_mittest_utils.h"
namespace oceanbase
{
namespace observer
{
uint32_t get_local_addr(const char *dev_name)
{
int fd, intrface;
struct ifreq buf[16];
struct ifconf ifc;
if ((fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
return 0;
}
ifc.ifc_len = sizeof(buf);
ifc.ifc_buf = (caddr_t)buf;
if (ioctl(fd, SIOCGIFCONF, (char *)&ifc) != 0) {
close(fd);
return 0;
}
intrface = static_cast<int>(ifc.ifc_len / sizeof(struct ifreq));
while (intrface-- > 0) {
if (ioctl(fd, SIOCGIFFLAGS, (char *)&buf[intrface]) != 0) {
continue;
}
if ((buf[intrface].ifr_flags & IFF_LOOPBACK) != 0)
continue;
if (!(buf[intrface].ifr_flags & IFF_UP))
continue;
if (dev_name != NULL && strcmp(dev_name, buf[intrface].ifr_name))
continue;
if (!(ioctl(fd, SIOCGIFADDR, (char *)&buf[intrface]))) {
close(fd);
return ((struct sockaddr_in *)(&buf[intrface].ifr_addr))->sin_addr.s_addr;
}
}
close(fd);
return 0;
}
int64_t ObSimpleServerReplica::get_rpc_port(int &server_fd)
{
return unittest::get_rpc_port(server_fd);
}
ObSimpleServerReplica::ObSimpleServerReplica(const std::string &app_name,
const std::string &env_prefix,
const int zone_id,
const int rpc_port,
const std::string &rs_list,
const ObServerInfoList &server_list,
bool is_restart,
ObServer &server,
const std::string &dir_prefix,
const char *log_disk_size,
const char *memory_limit)
: server_(server), zone_id_(zone_id), rpc_port_(rpc_port), rs_list_(rs_list),
server_info_list_(server_list), app_name_(app_name), data_dir_(dir_prefix),
run_dir_(env_prefix), log_disk_size_(log_disk_size), memory_limit_(memory_limit),
is_restart_(is_restart)
{
// if (ObSimpleServerReplicaRestartHelper::is_restart_) {
// std::string port_file_name = run_dir_ + std::string("/port.txt");
// FILE *infile = nullptr;
// if (nullptr == (infile = fopen(port_file_name.c_str(), "r"))) {
// ob_abort();
// }
// fscanf(infile, "%d\n", &rpc_port_);
// } else {
// rpc_port_ = unittest::get_rpc_port(server_fd_);
// }
mysql_port_ = rpc_port_ + 1;
}
std::string ObSimpleServerReplica::get_local_ip()
{
uint32_t ip = get_local_addr("bond0");
if (ip == 0) {
ip = get_local_addr("eth0");
}
if (ip == 0) {
return "";
}
return inet_ntoa(*(struct in_addr *)(&ip));
}
int ObSimpleServerReplica::simple_init()
{
int ret = OB_SUCCESS;
local_ip_ = get_local_ip();
if (local_ip_ == "") {
SERVER_LOG(WARN, "get_local_ip failed");
return -666666666;
}
easy_pool_set_allocator(ob_easy_realloc);
ev_set_allocator(ob_easy_realloc);
std::string zone_str = "zone" + std::to_string(zone_id_);
ObServerOptions opts;
opts.cluster_id_ = 1;
opts.rpc_port_ = rpc_port_;
opts.mysql_port_ = mysql_port_;
opts.data_dir_ = data_dir_.c_str();
opts.zone_ = zone_str.c_str();
opts.appname_ = "test_ob";
opts.rs_list_ = rs_list_.c_str();
// NOTE: memory_limit must keep same with log_disk_size
optstr_ = std::string();
optstr_ = optstr_ + "log_disk_size=" + std::string(log_disk_size_)
+ ",memory_limit=" + std::string(memory_limit_)
+ ",cache_wash_threshold=1G,net_thread_count=4,cpu_count=16,schema_history_expire_time="
"1d,workers_per_cpu_quota=10,datafile_disk_percentage=2,__min_full_resource_pool_"
"memory=2147483648,system_memory=5G,trace_log_slow_query_watermark=100ms,datafile_"
"size=10G,stack_size=512K";
opts.optstr_ = optstr_.c_str();
// opts.devname_ = "eth0";
opts.use_ipv6_ = false;
char *curr_dir = get_current_dir_name();
if (OB_FAIL(chdir(run_dir_.c_str()))) {
SERVER_LOG(WARN, "change dir failed.", KR(ret), K(curr_dir), K(run_dir_.c_str()), K(errno));
} else {
SERVER_LOG(INFO, "change dir done.", K(curr_dir), K(run_dir_.c_str()));
}
fprintf(stdout,
"[PID:%d] init opt : zone_id = %d, rpc_port = %d, mysql_port = %d, zone = %s, "
"all_server_count = "
"%ld, rs_list = %s\n",
getpid(), zone_id_, rpc_port_, mysql_port_, zone_str.c_str(), server_info_list_.count(),
rs_list_.c_str());
// 因为改变了工作目录,设置为绝对路径
for (int i = 0; i < MAX_FD_FILE; i++) {
int len = strlen(OB_LOGGER.log_file_[i].filename_);
if (len > 0) {
std::string cur_file_name = OB_LOGGER.log_file_[i].filename_;
cur_file_name = cur_file_name.substr(cur_file_name.find_last_of("/\\") + 1);
std::string ab_file = std::string(curr_dir) + "/" + run_dir_ + "/"
+ cur_file_name;
SERVER_LOG(INFO, "convert ab file", K(ab_file.c_str()));
MEMCPY(OB_LOGGER.log_file_[i].filename_, ab_file.c_str(), ab_file.size());
}
}
// std::string ab_file = std::string(curr_dir) + "/" + run_dir_ + "/" + app_name_;
//
// std::string app_log_name = ab_file + ".log";
// std::string app_rs_log_name = ab_file + "_rs.log";
// std::string app_ele_log_name = ab_file + "_election.log";
// std::string app_trace_log_name = ab_file + "_trace.log";
// OB_LOGGER.set_file_name(app_log_name.c_str(),
// true,
// false,
// app_rs_log_name.c_str(),
// app_ele_log_name.c_str(),
// app_trace_log_name.c_str());
ObPLogWriterCfg log_cfg;
ret = server_.init(opts, log_cfg);
if (OB_FAIL(ret)) {
return ret;
}
ret = init_sql_proxy();
if (OB_SUCC(ret)) {
if (OB_FAIL(bootstrap_client_.init())) {
SERVER_LOG(WARN, "client init failed", K(ret));
} else if (OB_FAIL(bootstrap_client_.get_proxy(bootstrap_srv_proxy_))) {
SERVER_LOG(WARN, "get_proxy failed", K(ret));
}
}
return ret;
}
int ObSimpleServerReplica::init_sql_proxy()
{
int ret = OB_SUCCESS;
sql_conn_pool_.set_db_param("root@sys", "", "test");
common::ObAddr db_addr;
db_addr.set_ip_addr(local_ip_.c_str(), mysql_port_);
ObConnPoolConfigParam param;
//param.sqlclient_wait_timeout_ = 10; // 10s
// turn up it, make unittest pass
param.sqlclient_wait_timeout_ = 1000; // 300s
param.long_query_timeout_ = 300*1000*1000; // 120s
param.connection_refresh_interval_ = 200*1000; // 200ms
param.connection_pool_warn_time_ = 10*1000*1000; // 1s
param.sqlclient_per_observer_conn_limit_ = 1000;
ret = sql_conn_pool_.init(db_addr, param);
if (OB_SUCC(ret)) {
sql_conn_pool_.set_mode(common::sqlclient::ObMySQLConnection::DEBUG_MODE);
ret = sql_proxy_.init(&sql_conn_pool_);
}
return ret;
}
int ObSimpleServerReplica::init_sql_proxy2(const char *tenant_name, const char *db_name, const bool oracle_mode)
{
int ret = OB_SUCCESS;
std::string user = oracle_mode ? "sys@" : "root@";
sql_conn_pool2_.set_db_param((user + std::string(tenant_name)).c_str(), "", db_name);
common::ObAddr db_addr;
db_addr.set_ip_addr(local_ip_.c_str(), mysql_port_);
ObConnPoolConfigParam param;
//param.sqlclient_wait_timeout_ = 10; // 10s
// turn up it, make unittest pass
param.sqlclient_wait_timeout_ = 1000; // 100s
param.long_query_timeout_ = 300*1000*1000; // 120s
param.connection_refresh_interval_ = 200*1000; // 200ms
param.connection_pool_warn_time_ = 10*1000*1000; // 1s
param.sqlclient_per_observer_conn_limit_ = 1000;
ret = sql_conn_pool2_.init(db_addr, param);
if (OB_SUCC(ret)) {
sql_conn_pool2_.set_mode(common::sqlclient::ObMySQLConnection::DEBUG_MODE);
ret = sql_proxy2_.init(&sql_conn_pool2_);
}
return ret;
}
int ObSimpleServerReplica::init_sql_proxy_with_short_wait()
{
int ret = OB_SUCCESS;
sql_conn_pool_with_short_wait_.set_db_param("root@sys", "", "test");
common::ObAddr db_addr;
db_addr.set_ip_addr(local_ip_.c_str(), mysql_port_);
ObConnPoolConfigParam param;
//param.sqlclient_wait_timeout_ = 10; // 10s
// turn up it, make unittest pass
param.sqlclient_wait_timeout_ = 3; // 3s
param.long_query_timeout_ = 3*1000*1000; // 3s
param.connection_refresh_interval_ = 200*1000; // 200ms
param.connection_pool_warn_time_ = 10*1000*1000; // 1s
param.sqlclient_per_observer_conn_limit_ = 1000;
ret = sql_conn_pool_with_short_wait_.init(db_addr, param);
if (OB_SUCC(ret)) {
sql_conn_pool_with_short_wait_.set_mode(common::sqlclient::ObMySQLConnection::DEBUG_MODE);
ret = sql_proxy_with_short_wait_.init(&sql_conn_pool_with_short_wait_);
}
return ret;
}
int ObSimpleServerReplica::simple_start()
{
int ret = OB_SUCCESS;
// bootstrap
if (zone_id_ == 1 && !is_restart_) {
std::thread th([this]() {
int64_t start_time = ObTimeUtility::current_time();
int ret = OB_SUCCESS;
int64_t curr_time = ObTimeUtility::current_time();
while (curr_time - start_time < 5 * 60 * 1000 * 1000) {
ret = this->bootstrap();
if (OB_SUCC(ret)) {
break;
}
::usleep(200 * 1000);
curr_time = ObTimeUtility::current_time();
}
SERVER_LOG(INFO, "ObSimpleServerReplica bootstrap th exit", K(ret), K(zone_id_), K(rpc_port_),
K(mysql_port_));
});
th_ = std::move(th);
}
SERVER_LOG(INFO, "ObSimpleServerReplica init succ prepare to start...", K(zone_id_), K(rpc_port_),
K(mysql_port_));
ret = server_.start();
if (zone_id_ == 1 && !is_restart_) {
th_.join();
fprintf(stdout, "[BOOTSTRAP SUCC] zone_id = %d, rpc_port = %d, mysql_port = %d\n", zone_id_,
rpc_port_, mysql_port_);
}
if (OB_SUCC(ret)) {
SERVER_LOG(INFO, "ObSimpleServerReplica start succ", K(zone_id_), K(rpc_port_), K(mysql_port_));
fprintf(stdout, "[START OBSERVER SUCC] zone_id = %d, rpc_port = %d, mysql_port = %d\n",
zone_id_, rpc_port_, mysql_port_);
} else {
SERVER_LOG(WARN, "ObSimpleServerReplica start failed", K(ret), K(zone_id_), K(rpc_port_),
K(mysql_port_));
// fprintf(stdout, "start failed. ret = %d\n", ret);
ob_abort();
}
return ret;
}
int ObSimpleServerReplica::bootstrap()
{
SERVER_LOG(INFO, "ObSimpleServerReplica::bootstrap start", K(zone_id_), K(rpc_port_), K(mysql_port_));
int ret = OB_SUCCESS;
/*
if (server_.get_gctx().ob_service_ == nullptr) {
ret = -66666666;
SERVER_LOG(INFO, "observice is nullptr");
} else {
// observer内部有线程的检查, 这里在新建线程下调用会有问题
obrpc::ObServerInfo server_info;
server_info.zone_ = "zone1";
server_info.server_ = common::ObAddr(common::ObAddr::IPV4, local_ip_.c_str(), rpc_port_);
server_info.region_ = "sys_region";
obrpc::ObBootstrapArg arg;
arg.cluster_role_ = common::PRIMARY_CLUSTER;
arg.server_list_.push_back(server_info);
SERVER_LOG(INFO, "observice.bootstrap call", K(arg), K(ret));
ret = server_.get_gctx().ob_service_->bootstrap(arg);
SERVER_LOG(INFO, "observice.bootstrap return", K(arg), K(ret));
}
*/
// obrpc::ObNetClient client;
// obrpc::ObSrvRpcProxy srv_proxy;
// } else {
const int64_t timeout = 180 * 1000 * 1000; //180s
common::ObAddr dst_server(common::ObAddr::IPV4, local_ip_.c_str(), rpc_port_);
bootstrap_srv_proxy_.set_server(dst_server);
bootstrap_srv_proxy_.set_timeout(timeout);
// obrpc::ObServerInfo server_info;
// std::string zone_str = "zone" +std::tostrin
// server_info.zone_ = "";
// server_info.server_ = common::ObAddr(common::ObAddr::IPV4, local_ip_.c_str(), rpc_port_);
// server_info.region_ = "sys_region";
obrpc::ObBootstrapArg arg;
arg.cluster_role_ = common::PRIMARY_CLUSTER;
arg.server_list_.assign(server_info_list_);
if (OB_FAIL(bootstrap_srv_proxy_.bootstrap(arg))) {
SERVER_LOG(WARN, "bootstrap failed", K(arg), K(ret));
}
// }
SERVER_LOG(INFO, "ObSimpleServerReplica::bootstrap end", K(ret), K(zone_id_), K(rpc_port_), K(mysql_port_));
return ret;
}
int ObSimpleServerReplica::simple_close()
{
SERVER_LOG(INFO, "ObSimpleServerReplica::simple_close start");
int ret = OB_SUCCESS;
// remove ls for exit
/*
ObSEArray<uint64_t, 16> tenant_ids;
GCTX.omt_->get_mtl_tenant_ids(tenant_ids);
auto do_remove_ls = [] (uint64_t tenant_id) {
int ret = OB_SUCCESS;
share::ObTenantSwitchGuard guard;
ObLS *ls;
if (OB_SUCC(guard.switch_to(tenant_id))) {
ObSEArray<share::ObLSID, 10> ls_ids;
common::ObSharedGuard<storage::ObLSIterator> ls_iter;
if (OB_SUCC(MTL(ObLSService*)->get_ls_iter(ls_iter, ObLSGetMod::STORAGE_MOD))) {
while (true) {
if (OB_SUCC(ls_iter->get_next(ls))) {
ls_ids.push_back(ls->get_ls_id());
} else {
break;
}
}
}
ls_iter.reset();
SERVER_LOG(INFO, "safe quit need remove ls", K(MTL_ID()), K(ls_ids));
for (int i = 0; i < ls_ids.count(); i++) {
if (ls_ids.at(i).id() > share::ObLSID::SYS_LS_ID) {
MTL(ObLSService*)->remove_ls(ls_ids.at(i));
}
}
MTL(ObLSService*)->remove_ls(share::ObLSID{share::ObLSID::SYS_LS_ID});
}
};
for (int64_t i = 0; i < tenant_ids.count(); i++) {
if (tenant_ids.at(i) != OB_SYS_TENANT_ID) {
do_remove_ls(tenant_ids.at(i));
}
}
do_remove_ls(OB_SYS_TENANT_ID);
*/
sql_conn_pool_.stop();
sql_conn_pool_.close_all_connection();
sql_conn_pool2_.stop();
sql_conn_pool2_.close_all_connection();
SERVER_LOG(INFO, "ObSimpleServerReplica::simple_close set_stop");
server_.set_stop();
SERVER_LOG(INFO, "ObSimpleServerReplica::simple_close wait");
ret = server_.wait();
SERVER_LOG(INFO, "ObSimpleServerReplica::simple_close destroy");
server_.destroy();
SERVER_LOG(INFO, "ObSimpleServerReplica::simple_close destroy");
ObKVGlobalCache::get_instance().destroy();
SERVER_LOG(INFO, "ObSimpleServerReplica::simple_close destroy");
ObVirtualTenantManager::get_instance().destroy();
SERVER_LOG(INFO, "ObSimpleServerReplica::simple_close end", K(ret));
SERVER_LOG(INFO, "ObSimpleServerReplica::simple_close end", K(ret));
return ret;
}
void ObSimpleServerReplica::reset()
{
}
} // end observer
} // end oceanbase