fix stop server may failed to check log is sync
This commit is contained in:
@ -7538,52 +7538,14 @@ int ObRootService::generate_stop_server_log_in_sync_dest_server_array(
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObRootService::generate_log_in_sync_dest_server_buff(common::ObIAllocator& allocator,
|
||||
const common::ObIArray<common::ObAddr>& dest_server_array, common::ObString& dest_server_buff)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_UNLIKELY(!inited_)) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("not init", K(ret));
|
||||
} else if (OB_UNLIKELY(dest_server_array.count() <= 0)) {
|
||||
ret = OB_SERVER_NOT_ALIVE; // dest_server_array is empty, although it can be processed, but still throw error
|
||||
LOG_WARN("all server not alive", K(ret));
|
||||
} else {
|
||||
char* buf = NULL;
|
||||
const int64_t buf_len = dest_server_array.count() * (MAX_IP_ADDR_LENGTH + 2) + 1;
|
||||
if (OB_UNLIKELY(nullptr == (buf = static_cast<char*>(allocator.alloc(buf_len))))) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
LOG_WARN("fail to alloc buf", K(ret), K(buf));
|
||||
} else {
|
||||
int64_t pos = 0;
|
||||
bool first = true;
|
||||
MEMSET(static_cast<void*>(buf), 0, buf_len);
|
||||
for (int64_t index = 0; OB_SUCC(ret) && index < dest_server_array.count(); ++index) {
|
||||
const common::ObAddr& server = dest_server_array.at(index);
|
||||
if (OB_FAIL(databuff_printf(buf, buf_len, pos, "%s", first ? "" : ","))) {
|
||||
LOG_WARN("fail to do databuff printf", K(ret));
|
||||
} else {
|
||||
first = false;
|
||||
pos += server.to_string(buf + pos, buf_len - pos);
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
dest_server_buff.assign_ptr(buf, pos);
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObRootService::check_is_log_sync(bool& is_log_sync, const common::ObIArray<ObAddr>& stop_server)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObArenaAllocator alloc;
|
||||
is_log_sync = true;
|
||||
common::ObZone zone; /*empty zone*/
|
||||
common::ObArray<common::ObAddr> alive_server_array;
|
||||
common::ObArray<common::ObAddr> dest_server_array;
|
||||
common::ObString dest_server_buff;
|
||||
ObSqlString sql;
|
||||
if (OB_UNLIKELY(!inited_)) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("not init", K(ret));
|
||||
@ -7596,10 +7558,9 @@ int ObRootService::check_is_log_sync(bool& is_log_sync, const common::ObIArray<O
|
||||
K(alive_server_array),
|
||||
K(stop_server),
|
||||
K(dest_server_array));
|
||||
} else if (OB_FAIL(generate_log_in_sync_dest_server_buff(alloc, dest_server_array, dest_server_buff))) {
|
||||
LOG_WARN("fail to generate log in sync dest server buff", K(ret), K(dest_server_array), K(dest_server_buff));
|
||||
} else if (OB_FAIL(generate_log_in_sync_sql(dest_server_array, sql))) {
|
||||
LOG_WARN("fail to generate log in sync dest server buff", K(ret), K(dest_server_array), K(sql));
|
||||
} else {
|
||||
ObSqlString sql;
|
||||
HEAP_VAR(ObMySQLProxy::MySQLResult, res)
|
||||
{
|
||||
sqlclient::ObMySQLResult* result = NULL;
|
||||
@ -7608,12 +7569,6 @@ int ObRootService::check_is_log_sync(bool& is_log_sync, const common::ObIArray<O
|
||||
// pay attention partition_idx actually is partition_id
|
||||
if (OB_FAIL(schema_service_->get_schema_guard(schema_guard))) {
|
||||
LOG_WARN("fail to get schema guard", K(ret));
|
||||
} else if (OB_FAIL(sql.assign_fmt("SELECT table_id, partition_idx FROM %s WHERE is_in_sync = 0 and "
|
||||
"is_offline = 0 and replica_type != 16 "
|
||||
"and concat(svr_ip, \":\", svr_port) in (%s)",
|
||||
OB_ALL_VIRTUAL_CLOG_STAT_TNAME,
|
||||
dest_server_buff.ptr()))) {
|
||||
LOG_WARN("assign_fmt failed", K(ret));
|
||||
} else if (OB_FAIL(sql_proxy_.read(res, sql.ptr()))) {
|
||||
LOG_WARN("execute sql failed", K(ret), K(sql));
|
||||
} else if (OB_ISNULL(result = res.get_result())) {
|
||||
@ -7836,6 +7791,48 @@ int ObRootService::admin_switch_replica_role(const obrpc::ObAdminSwitchReplicaRo
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObRootService::generate_log_in_sync_sql(
|
||||
const common::ObIArray<common::ObAddr> &dest_server_array, common::ObSqlString &sql)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
sql.reset();
|
||||
if (OB_UNLIKELY(!inited_)) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("not init", KR(ret));
|
||||
} else if (OB_UNLIKELY(dest_server_array.count() <= 0)) {
|
||||
ret = OB_SERVER_NOT_ALIVE; // all server not alive
|
||||
LOG_WARN("all server not alive", KR(ret));
|
||||
} else if (OB_FAIL(sql.assign_fmt("SELECT table_id, partition_idx FROM %s WHERE is_in_sync = 0 and "
|
||||
"is_offline = 0 and replica_type != 16 "
|
||||
"and (svr_ip, svr_port) in (",
|
||||
OB_ALL_VIRTUAL_CLOG_STAT_TNAME))) {
|
||||
LOG_WARN("assign_fmt failed", KR(ret), K(sql));
|
||||
} else {
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < dest_server_array.count(); ++i) {
|
||||
const common::ObAddr &addr = dest_server_array.at(i);
|
||||
char ip_str[MAX_IP_PORT_LENGTH] = "";
|
||||
if (0 != i) {
|
||||
if (OB_FAIL(sql.append(","))) {
|
||||
LOG_WARN("failed to append fmt sql", KR(ret), K(i), K(sql));
|
||||
}
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (!addr.ip_to_string(ip_str, MAX_IP_PORT_LENGTH)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("failed to ip to string", KR(ret), K(ip_str), K(addr));
|
||||
} else if (OB_FAIL(sql.append_fmt("(\"%s\", %d)", ip_str, addr.get_port()))) {
|
||||
LOG_WARN("failed to append fmt sql", KR(ret), K(i), K(ip_str), K(addr), K(sql));
|
||||
}
|
||||
}
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_FAIL(sql.append(")"))) {
|
||||
LOG_WARN("failed to append fmt sql", KR(ret), K(sql));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
int ObRootService::admin_switch_rs_role(const obrpc::ObAdminSwitchRSRoleArg& arg)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
@ -1166,16 +1166,15 @@ public:
|
||||
int logical_restore_partitions(const obrpc::ObRestorePartitionsArg& arg);
|
||||
|
||||
int check_is_log_sync(bool& is_log_sync, const common::ObIArray<common::ObAddr>& stop_server);
|
||||
int generate_log_in_sync_sql(const common::ObIArray<common::ObAddr> &dest_server_array, common::ObSqlString &sql);
|
||||
int generate_stop_server_log_in_sync_dest_server_array(const common::ObIArray<common::ObAddr>& alive_server_array,
|
||||
const common::ObIArray<common::ObAddr>& excluded_server_array,
|
||||
common::ObIArray<common::ObAddr>& dest_server_array);
|
||||
int generate_log_in_sync_dest_server_buff(common::ObIAllocator& allocator,
|
||||
const common::ObIArray<common::ObAddr>& dest_server_array, common::ObString& dest_server_buff);
|
||||
int log_nop_operation(const obrpc::ObDDLNopOpreatorArg& arg);
|
||||
int broadcast_schema(const obrpc::ObBroadcastSchemaArg& arg);
|
||||
int check_other_rs_exist(bool& is_other_rs_exist);
|
||||
ObRsGtsManager& get_rs_gts_manager()
|
||||
{
|
||||
{
|
||||
return rs_gts_manager_;
|
||||
}
|
||||
ObFreezeInfoManager& get_freeze_manager()
|
||||
|
||||
Reference in New Issue
Block a user