/** * Copyright (c) 2021 OceanBase * OceanBase CE is licensed under Mulan PubL v2. * You can use this software according to the terms and conditions of the Mulan PubL v2. * You may obtain a copy of Mulan PubL v2 at: * http://license.coscl.org.cn/MulanPubL-2.0 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. * See the Mulan PubL v2 for more details. */ #include #include #include #include "share/schema/ob_schema_service.h" #include "share/schema/ob_schema_service_sql_impl.h" #include "share/inner_table/ob_inner_table_schema.h" #include "rootserver/ob_schema2ddl_sql.h" #include "lib/string/ob_sql_string.h" #include "lib/mysqlclient/ob_mysql_transaction.h" #include "share/ob_global_stat_proxy.h" #include "observer/ob_server_struct.h" namespace oceanbase { namespace share { namespace schema { inline std::string DBInitializer::to_var_name(const std::string& from) const { std::string to = from; for (std::string::iterator it = to.begin(); it != to.end(); ++it) { if (!isalnum(*it)) { *it = '_'; } } return to; } std::string DBInitializer::get_env_value(const char* name, const char* def_val) { std::string val = def_val; const char* env_val = getenv(name); if (NULL != env_val) { val = env_val; } return val; } inline int DBInitializer::init() { int ret = common::OB_SUCCESS; GCTX.split_schema_version_ = OB_INVALID_VERSION; int64_t begin_us = ObTimeUtility::current_time(); if (OB_FAIL(mysql_host_detect())) { return ret; } int64_t end_us = ObTimeUtility::current_time(); _OB_LOG(INFO, "time to detect mysql server: %ld", end_us - begin_us); common::ObSqlString cmd; ret = cmd.assign_fmt("mysql -h %s -P %d -u %s %s%s -e " "'" "set global sql_mode = strict_all_tables;" "drop database if exists %s; create database if not exists %s;" "use %s; delimiter $$; create function time_to_usec(t timestamp) " "returns bigint(20) deterministic begin return unix_timestamp(t)*1000000; end$$ " "create function usec_to_time(u bigint(20)) " "returns timestamp deterministic begin return from_unixtime(u/1000000); end$$" "'", db_.host_.c_str(), db_.port_, db_.user_.c_str(), db_.pass_.size() > 0 ? "-p" : "", db_.pass_.c_str(), db_.db_.c_str(), db_.db_.c_str(), db_.db_.c_str()); if (OB_FAIL(ret)) { _OB_LOG(WARN, "sql_string assign string failed, ret[%d]", ret); return ret; } _OB_LOG(INFO, "init database by command %s", cmd.ptr()); ret = system(cmd.ptr()); if (0 != ret) { _OB_LOG(WARN, "execute cmd %s failed, ret %d", cmd.ptr(), ret); return ret; } sql_conn_pool_.set_db_param(db_.user_.c_str(), db_.pass_.c_str(), db_.db_.c_str()); common::ObAddr db_addr; db_addr.set_ip_addr(db_.host_.c_str(), db_.port_); _OB_LOG(INFO, "init sql connect pool, server %s, database name %s", common::to_cstring(db_addr), db_.db_.c_str()); ObConnPoolConfigParam param; // param.sqlclient_wait_timeout_ = 10; // 10s // turn up it, make unittest pass param.sqlclient_wait_timeout_ = 100; // 100s param.long_query_timeout_ = 120 * 1000 * 1000; // 120s param.connection_refresh_interval_ = 200 * 1000; // 200ms param.connection_pool_warn_time_ = 1 * 1000 * 1000; // 1s param.sqlclient_per_observer_conn_limit_ = 10; if (common::OB_SUCCESS != (ret = sql_conn_pool_.init(db_addr, param))) { _OB_LOG(WARN, "init sql connection pool failed, ret %d", ret); } else { sql_conn_pool_.set_mode(common::sqlclient::ObMySQLConnection::DEBUG_MODE); sql_proxy_.init(&sql_conn_pool_); } if (OB_SUCC(ret)) { if (OB_FAIL(db_gc())) { COMMON_LOG(INFO, "gc failed", K(ret)); } } return ret; } inline int DBInitializer::create_system_table(bool only_core_tables) { int ret = common::OB_SUCCESS; static const schema_create_func all_core_table_schema_creator[] = { &share::ObInnerTableSchema::all_core_table_schema, NULL}; const schema_create_func* creator_ptr_array[] = {all_core_table_schema_creator, share::core_table_schema_creators, (only_core_tables ? NULL : share::sys_table_schema_creators), NULL}; for (const schema_create_func** creator_ptr_ptr = creator_ptr_array; common::OB_SUCCESS == ret && NULL != *creator_ptr_ptr; ++creator_ptr_ptr) { for (const schema_create_func* creator_ptr = *creator_ptr_ptr; common::OB_SUCCESS == ret && NULL != *creator_ptr; ++creator_ptr) { share::schema::ObTableSchema table_schema; if (OB_SUCCESS != (ret = (*creator_ptr)(table_schema))) { _OB_LOG(WARN, "create table schema failed, ret %d", ret); ret = common::OB_SCHEMA_ERROR; } else if (table_schema.is_view_table()) { } else { int64_t affect_rows = 0; char sql[common::OB_MAX_SQL_LENGTH]; memset(sql, 0, sizeof(sql)); if (common::OB_SUCCESS != (ret = rootserver::ObSchema2DDLSql::convert(table_schema, sql, sizeof(sql)))) { _OB_LOG(WARN, "convert table schema to create table sql failed, ret %d", ret); } else if (common::OB_SUCCESS != (ret = sql_proxy_.write(sql, affect_rows))) { _OB_LOG(WARN, "execute sql failed, ret %d, sql %s", ret, sql); } } } } if (OB_SUCC(ret)) { ObGlobalStatProxy proxy(sql_proxy_); if (OB_FAIL(proxy.set_init_value(1, 1, 1, 0, 0, 0, 1, 1, 1))) { _OB_LOG(WARN, "set init value failed, ret %d", ret); } } /* if (OB_SUCC(ret)) { if (OB_FAIL(log_init_core_operation())) { _OB_LOG(WARN, "log_int_core_operation failed, ret [%d]", ret); } } */ return ret; } inline int DBInitializer::fill_sys_stat_table(const uint64_t tenant_id) { int ret = common::OB_SUCCESS; // insert max_used_xxxx_id in __all_sys_stat table common::ObSqlString sql_string; int64_t affect_rows = 0; if (common::OB_SUCCESS != (ret = (sql_string.append_fmt("insert into __all_sys_stat " "(tenant_id, zone, name, data_type, value, info) values " "(%lu, '', 'ob_max_used_table_id', 1, '%lu', 'max used table id')", tenant_id, combine_id(tenant_id, OB_MIN_USER_TABLE_ID))))) { _OB_LOG(WARN, "sql_string assign string failed, ret[%d]", ret); } else if (common::OB_SUCCESS != (ret = sql_proxy_.write(sql_string.ptr(), affect_rows))) { _OB_LOG(WARN, "execute sql failed, sql[%s], ret[%d]", sql_string.ptr(), ret); } if (OB_SUCC(ret)) { sql_string.reset(); if (common::OB_SUCCESS != (ret = (sql_string.append_fmt("insert into __all_sys_stat " "(tenant_id, zone, name, data_type, value, info) values " "(%lu, '', 'ob_max_used_tenant_id', 1, '%lu', 'max used tenant id')", tenant_id, OB_USER_TENANT_ID)))) { _OB_LOG(WARN, "sql_string assign string failed, ret[%d]", ret); } else if (common::OB_SUCCESS != (ret = sql_proxy_.write(sql_string.ptr(), affect_rows))) { _OB_LOG(WARN, "execute sql failed, sql[%s], ret[%d]", sql_string.ptr(), ret); } } if (OB_SUCC(ret)) { sql_string.reset(); if (common::OB_SUCCESS != (ret = (sql_string.append_fmt("insert into __all_sys_stat " "(tenant_id, zone, name, data_type, value, info) values " "(%lu, '', 'ob_max_used_database_id', 1, '%lu', 'max used database id')", tenant_id, combine_id(tenant_id, OB_USER_DATABASE_ID))))) { _OB_LOG(WARN, "sql_string assign string failed, ret[%d]", ret); } else if (common::OB_SUCCESS != (ret = sql_proxy_.write(sql_string.ptr(), affect_rows))) { _OB_LOG(WARN, "execute sql failed, sql[%s], ret[%d]", sql_string.ptr(), ret); } } if (OB_SUCC(ret)) { sql_string.reset(); if (common::OB_SUCCESS != (ret = (sql_string.append_fmt("insert into __all_sys_stat " "(tenant_id, zone, name, data_type, value, info) values " "(%lu, '', 'ob_max_used_tablegroup_id', 1, '%lu', 'max used tablegroup id')", tenant_id, combine_id(tenant_id, OB_USER_TABLEGROUP_ID))))) { _OB_LOG(WARN, "sql_string assign string failed, ret[%d]", ret); } else if (common::OB_SUCCESS != (ret = sql_proxy_.write(sql_string.ptr(), affect_rows))) { _OB_LOG(WARN, "execute sql failed, sql[%s], ret[%d]", sql_string.ptr(), ret); } } if (OB_SUCC(ret)) { sql_string.reset(); if (common::OB_SUCCESS != (ret = (sql_string.append_fmt("insert into __all_sys_stat " "(tenant_id, zone, name, data_type, value, info) values " "(%lu, '', 'ob_max_used_user_id', 1, '%lu', 'max used user id')", tenant_id, combine_id(tenant_id, OB_USER_ID))))) { _OB_LOG(WARN, "sql_string assign string failed, ret[%d]", ret); } else if (common::OB_SUCCESS != (ret = sql_proxy_.write(sql_string.ptr(), affect_rows))) { _OB_LOG(WARN, "execute sql failed, sql[%s], ret[%d]", sql_string.ptr(), ret); } } if (OB_SYS_TENANT_ID == tenant_id) { if (OB_SUCC(ret)) { sql_string.reset(); if (common::OB_SUCCESS != (ret = (sql_string.append_fmt("insert into __all_sys_stat " "(tenant_id, zone, name, data_type, value, info) values " "(%lu, '', 'ob_max_used_unit_config_id', 1, '%lu', 'max used unit config id')", tenant_id, OB_USER_UNIT_CONFIG_ID)))) { _OB_LOG(WARN, "sql_string assign string failed, ret[%d]", ret); } else if (common::OB_SUCCESS != (ret = sql_proxy_.write(sql_string.ptr(), affect_rows))) { _OB_LOG(WARN, "execute sql failed, sql[%s], ret[%d]", sql_string.ptr(), ret); } } if (OB_SUCC(ret)) { sql_string.reset(); if (common::OB_SUCCESS != (ret = (sql_string.append_fmt( "insert into __all_sys_stat " "(tenant_id, zone, name, data_type, value, info) values " "(%lu, '', 'ob_max_used_resource_pool_id', 1, '%lu', 'max used resource pool id')", tenant_id, OB_USER_RESOURCE_POOL_ID)))) { _OB_LOG(WARN, "sql_string assign string failed, ret[%d]", ret); } else if (common::OB_SUCCESS != (ret = sql_proxy_.write(sql_string.ptr(), affect_rows))) { _OB_LOG(WARN, "execute sql failed, sql[%s], ret[%d]", sql_string.ptr(), ret); } } if (OB_SUCC(ret)) { sql_string.reset(); if (common::OB_SUCCESS != (ret = (sql_string.append_fmt("insert into __all_sys_stat " "(tenant_id, zone, name, data_type, value, info) values " "(%lu, '', 'ob_max_used_unit_id', 1, '%lu', 'max used unit id')", tenant_id, OB_USER_UNIT_ID)))) { _OB_LOG(WARN, "sql_string assign string failed, ret[%d]", ret); } else if (common::OB_SUCCESS != (ret = sql_proxy_.write(sql_string.ptr(), affect_rows))) { _OB_LOG(WARN, "execute sql failed, sql[%s], ret[%d]", sql_string.ptr(), ret); } } if (OB_SUCC(ret)) { sql_string.reset(); if (common::OB_SUCCESS != (ret = (sql_string.append_fmt("insert into __all_sys_stat " "(tenant_id, zone, name, data_type, value, info) values " "(%lu, '', 'ob_max_used_server_id', 1, '%lu', 'max used server id')", tenant_id, OB_INIT_SERVER_ID)))) { _OB_LOG(WARN, "sql_string assign string failed, ret[%d]", ret); } else if (common::OB_SUCCESS != (ret = sql_proxy_.write(sql_string.ptr(), affect_rows))) { _OB_LOG(WARN, "execute sql failed, sql[%s], ret[%d]", sql_string.ptr(), ret); } } } return ret; } inline int DBInitializer::create_tenant_space_tables(const uint64_t tenant_id) { int ret = common::OB_SUCCESS; std::string db = get_db_name(); ObSqlString sql; int64_t affect_rows = 0; if (OB_FAIL(sql.assign_fmt("drop database if exists %s_%lu", db.c_str(), tenant_id))) { _OB_LOG(WARN, "assign sql failed, ret %d", ret); } else if (common::OB_SUCCESS != (ret = sql_proxy_.write(sql.ptr(), affect_rows))) { _OB_LOG(WARN, "execute create database sql failed, ret %d, sql %s", ret, sql.ptr()); } else if (common::OB_SUCCESS != (ret = sql.assign_fmt("create database %s_%lu", db.c_str(), tenant_id))) { _OB_LOG(WARN, "assign sql failed, ret %d", ret); } else if (common::OB_SUCCESS != (ret = sql_proxy_.write(sql.ptr(), affect_rows))) { _OB_LOG(WARN, "execute create database sql failed, ret %d, sql %s", ret, sql.ptr()); } else { const schema_create_func* creator_ptr_array[] = { share::core_table_schema_creators, share::sys_table_schema_creators}; ObTableSchema tables[ARRAYSIZEOF(core_table_schema_creators) + ARRAYSIZEOF(sys_table_schema_creators)]; int64_t idx = 0; // build system table schema to %tables for (int64_t i = 0; common::OB_SUCCESS == ret && i < ARRAYSIZEOF(creator_ptr_array); i++) { for (const schema_create_func* creator_ptr = creator_ptr_array[i]; common::OB_SUCCESS == ret && NULL != *creator_ptr; ++creator_ptr) { if (OB_SUCCESS != (ret = (*creator_ptr)(tables[idx++]))) { _OB_LOG(WARN, "create table schema fialed, ret %d", ret); ret = common::OB_SCHEMA_ERROR; } } } for (int64_t i = 0; common::OB_SUCCESS == ret && i < ARRAYSIZEOF(tenant_space_tables) && !is_virtual_table(tenant_space_tables[i]); i++) { uint64_t tid = tenant_space_tables[i]; // found table schema for (idx = 0; idx < ARRAYSIZEOF(tables); ++idx) { if (extract_pure_id(tables[idx].get_table_id()) == tid) { break; } } char csql[common::OB_MAX_SQL_LENGTH]; memset(csql, 0, sizeof(csql)); if (tables[idx].is_view_table()) { } else if (common::OB_SUCCESS != (ret = rootserver::ObSchema2DDLSql::convert(tables[idx], csql, sizeof(csql)))) { _OB_LOG(WARN, "convert table schema to create table sql failed, ret %d", ret); } else if (common::OB_SUCCESS != (ret = sql_proxy_.write(tenant_id, csql, affect_rows))) { _OB_LOG(WARN, "execute sql failed, ret %d, sql %s", ret, csql); } } } return ret; } inline int DBInitializer::create_tenant_space(const uint64_t tenant_id) { int ret = common::OB_SUCCESS; if (OB_FAIL(fill_sys_stat_table(tenant_id))) { _OB_LOG(WARN, "fill sys stat table failed, ret %d", ret); } return ret; /* * We move system table to system tenant, only fill sys stat table here. * int ret = common::OB_SUCCESS; std::string db = get_db_name(); ObSqlString sql; int64_t affect_rows = 0; if (OB_FAIL(sql.assign_fmt("drop database if exists %s_%lu", db.c_str(), tenant_id))) { _OB_LOG(WARN, "assign sql failed, ret %d", ret); } else if (common::OB_SUCCESS != (ret = sql_proxy_.write(sql.ptr(), affect_rows))) { _OB_LOG(WARN, "execute create database sql failed, ret %d, sql %s", ret, sql.ptr()); } else if (common::OB_SUCCESS != (ret = sql.assign_fmt("create database %s_%lu", db.c_str(), tenant_id))) { _OB_LOG(WARN, "assign sql failed, ret %d", ret); } else if (common::OB_SUCCESS != (ret = sql_proxy_.write(sql.ptr(), affect_rows))) { _OB_LOG(WARN, "execute create database sql failed, ret %d, sql %s", ret, sql.ptr()); } else { const schema_create_func *creator_ptr_array[] = { share::core_table_schema_creators, share::sys_table_schema_creators }; ObTableSchema tables[ARRAYSIZEOF(core_table_schema_creators) + ARRAYSIZEOF(sys_table_schema_creators)]; int64_t idx = 0; // build system table schema to %tables for (int64_t i = 0; common::OB_SUCCESS == ret && i < ARRAYSIZEOF(creator_ptr_array); i++) { for (const schema_create_func *creator_ptr = creator_ptr_array[i]; common::OB_SUCCESS == ret && NULL != *creator_ptr; ++creator_ptr) { if (OB_SUCCESS != (ret = (*creator_ptr)(tables[idx++]))) { _OB_LOG(WARN, "create table schema fialed, ret %d", ret); ret = common::OB_SCHEMA_ERROR; } } } for (int64_t i = 0; common::OB_SUCCESS == ret && i < ARRAYSIZEOF(tenant_space_tables) && !is_virtual_table(tenant_space_tables[i]); i++) { uint64_t tid = tenant_space_tables[i]; // found table schema for (idx = 0; idx < ARRAYSIZEOF(tables); ++idx) { if (extract_pure_id(tables[idx].get_table_id()) == tid) { break; } } char csql[common::OB_MAX_SQL_LENGTH]; memset(csql, 0, sizeof(csql)); if (tables[idx].is_view_table()) { } else if (common::OB_SUCCESS != (ret = rootserver::ObSchema2DDLSql::convert( tables[idx], csql, sizeof(csql)))) { _OB_LOG(WARN, "convert table schema to create table sql failed, ret %d", ret); } else if (common::OB_SUCCESS != (ret = sql_proxy_.write( tenant_id, csql, affect_rows))) { _OB_LOG(WARN, "execute sql failed, ret %d, sql %s", ret, csql); } } } if (OB_SUCC(ret)) { if (common::OB_SUCCESS != (ret = fill_sys_stat_table(tenant_id))) { _OB_LOG(WARN, "fill sys stat table failed, ret %d", ret); } } return ret; */ } inline int DBInitializer::create_virtual_table(const schema_create_func create_func) { int ret = OB_SUCCESS; ObTableSchema table_schema; char sql[common::OB_MAX_SQL_LENGTH]; memset(sql, 0, sizeof(sql)); int64_t affected_rows = 0; if (OB_FAIL(create_func(table_schema))) { _OB_LOG(WARN, "create table schema failed, ret %d", ret); } else { table_schema.set_part_level(PARTITION_LEVEL_ZERO); if (OB_FAIL(rootserver::ObSchema2DDLSql::convert(table_schema, sql, sizeof(sql)))) { _OB_LOG(WARN, "convert table schema to sql failed, ret %d", ret); } else if (OB_FAIL(sql_proxy_.write(sql, affected_rows))) { _OB_LOG(WARN, "execute sql failed, ret %d, sql %s", ret, sql); } } return ret; } inline int DBInitializer::log_init_core_operation() { int ret = OB_SUCCESS; int64_t affected_rows = 0; ret = sql_proxy_.write("insert into __all_core_table (table_name, row_id, column_name, column_value) " "values ('__all_ddl_operation', 0, 'core_schema_version', '0')", affected_rows); if (OB_FAIL(ret)) { _OB_LOG(WARN, "execute sql failed, ret %d", ret); } return ret; } inline int DBInitializer::mysql_host_detect() { int ret = OB_SUCCESS; std::vector dbs; DBConfig db; db.host_ = get_env_value("MYSQL_HOST", "100.81.152.48"); std::string port = get_env_value("MYSQL_PORT", "3306"); db.port_ = atoi(port.c_str()); db.user_ = get_env_value("MYSQL_USER", "root"); db.pass_ = get_env_value("MYSQL_PASS", ""); dbs.push_back(db); std::string fix_mysql_host = get_env_value("FIX_MYSQL_HOST", ""); if (fix_mysql_host.empty()) { /* "100.81.152.31", "100.81.152.32", "100.81.152.33", "100.81.152.34", "100.81.152.35", "100.81.152.36", "100.81.152.37", "100.81.152.38", "100.81.152.39", "100.81.152.40", "100.81.152.41", "100.81.152.42", "100.81.152.43", "100.81.152.44", */ const char* extra_hosts[] = {"100.81.152.48"}; for (int64_t i = 0; i < ARRAYSIZEOF(extra_hosts); ++i) { db.host_ = extra_hosts[i]; dbs.push_back(db); } std::random_shuffle(dbs.begin(), dbs.end()); } common::ObSqlString cmd; db_ = dbs.at(0); FOREACH(d, dbs) { cmd.assign_fmt( "x=$(mysql -h %s -P%d -u%s -P '%s' -e 'select version()' 2>/dev/null| tail -n 1) ; [[ \"$x\" > \"5.6\" ]]", d->host_.c_str(), d->port_, d->user_.c_str(), d->pass_.c_str()); if (0 == system(cmd.ptr())) { db_ = *d; break; } } // generate database name std::string host = get_env_value("HOSTNAME", ""); const std::string user = get_env_value("USER", ""); // avoid mysql identifier name is too long error if (user.length() + host.length() > 28) { host = std::string(""); } const int64_t now = ObTimeUtility::current_time(); const int64_t pid = getpid(); if (!fix_mysql_host.empty()) { cmd.assign_fmt("_unittest_%s", user.c_str()); } else { cmd.assign_fmt("_unittest_%ld_%s_%s_%ld", now, user.c_str(), host.c_str(), pid); } db_.db_ = to_var_name(std::string(cmd.ptr())); COMMON_LOG(INFO, "USE MYSQL SERVER", "host", db_.host_.c_str(), "port", db_.port_, "user", db_.user_.c_str(), "database", db_.db_.c_str()); return ret; } inline int DBInitializer::db_gc() { int ret = OB_SUCCESS; const char* prefix = "_unittest_"; std::vector dbs; { common::ObMySQLProxy::MySQLResult res; if (common::OB_SUCCESS == sql_proxy_.read(res, "show databases")) { while (common::OB_SUCCESS == res.get_result()->next()) { common::ObString v; res.get_result()->get_varchar(0L, v); if (strncmp(prefix, v.ptr(), std::min((size_t)strlen(prefix), (size_t)v.length())) == 0) { std::string db(v.ptr(), v.length()); dbs.push_back(db); } } } } FOREACH(db, dbs) { const int64_t create_time = atol(db->c_str() + strlen(prefix)); if (create_time > 0 && create_time + 24 * 60 * 60 * 100000L < ObTimeUtility::current_time()) { int64_t affacted_row = 0; std::string cmd = std::string("drop database if exists ") + *db; sql_proxy_.write(cmd.c_str(), affacted_row); } } return ret; } inline DBInitializer::~DBInitializer() { std::string fix_mysql_host = get_env_value("FIX_MYSQL_HOST", ""); if (fix_mysql_host.empty() && !db_.db_.empty()) { int64_t affacted_row = 0; std::string cmd = std::string("drop database if exists ") + db_.db_; sql_proxy_.set_active(); sql_proxy_.write(cmd.c_str(), affacted_row); } COMMON_LOG(INFO, "USE MYSQL SERVER", "host", db_.host_.c_str(), "port", db_.port_, "user", db_.user_.c_str(), "database", db_.db_.c_str()); } } // end namespace schema } // end namespace share } // end namespace oceanbase