MXS-1929: Create internal server representation

The server now has an internal C++ version that extends the public one.
This commit is contained in:
Markus Mäkelä
2018-08-01 09:12:41 +03:00
parent ea8b522c8a
commit 4c7a5017bc
5 changed files with 125 additions and 170 deletions

View File

@ -15,13 +15,19 @@
* @file server.c - A representation of a backend server within the gateway.
*
*/
#include "internal/server.hh"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <string>
#include <list>
#include <mutex>
#include <maxscale/config.h>
#include <maxscale/service.h>
@ -54,6 +60,7 @@ using maxscale::Worker;
using maxscale::WorkerTask;
using std::string;
using Guard = std::lock_guard<std::mutex>;
/** The latin1 charset */
#define SERVER_DEFAULT_CHARSET 0x08
@ -64,13 +71,12 @@ const char CN_PERSISTMAXTIME[] = "persistmaxtime";
const char CN_PERSISTPOOLMAX[] = "persistpoolmax";
const char CN_PROXY_PROTOCOL[] = "proxy_protocol";
static SPINLOCK server_spin = SPINLOCK_INIT;
static SERVER *allServers = NULL;
static std::mutex server_lock;
static std::list<Server*> all_servers;
static const char ERR_CANNOT_MODIFY[] = "The server is monitored, so only the maintenance status can be "
"set/cleared manually. Status was not modified.";
static const char WRN_REQUEST_OVERWRITTEN[] = "Previous maintenance request was not yet read by the monitor "
"and was overwritten.";
static void spin_reporter(void *, char *, int);
static void server_parameter_free(SERVER_PARAM *tofree);
@ -113,7 +119,7 @@ SERVER* server_alloc(const char *name, MXS_CONFIG_PARAMETER* params)
return NULL;
}
SERVER *server = (SERVER *)MXS_CALLOC(1, sizeof(SERVER));
Server *server = new (std::nothrow) Server;
char *my_name = MXS_STRDUP(name);
char *my_protocol = MXS_STRDUP(protocol);
char *my_authenticator = MXS_STRDUP(authenticator);
@ -121,7 +127,7 @@ SERVER* server_alloc(const char *name, MXS_CONFIG_PARAMETER* params)
if (!server || !my_name || !my_protocol || !my_authenticator || !persistent)
{
MXS_FREE(server);
delete server;
MXS_FREE(my_name);
MXS_FREE(persistent);
MXS_FREE(my_protocol);
@ -143,35 +149,37 @@ SERVER* server_alloc(const char *name, MXS_CONFIG_PARAMETER* params)
server->server_chk_tail = CHK_NUM_SERVER;
#endif
server->name = my_name;
server->port = config_get_integer(params, CN_PORT);
server->protocol = my_protocol;
server->authenticator = my_authenticator;
server->auth_instance = auth_instance;
server->port = config_get_integer(params, CN_PORT);
server->server_ssl = ssl;
server->status = SERVER_RUNNING;
server->maint_request = MAINTENANCE_NO_CHANGE;
server->node_id = -1;
server->rlag = MAX_RLAG_UNDEFINED;
server->master_id = -1;
server->parameters = NULL;
spinlock_init(&server->lock);
server->persistent = persistent;
server->persistmax = 0;
server->persistpoolmax = config_get_integer(params, CN_PERSISTPOOLMAX);
server->persistmaxtime = config_get_integer(params, CN_PERSISTMAXTIME);
server->monuser[0] = '\0';
server->monpw[0] = '\0';
server->is_active = true;
server->charset = SERVER_DEFAULT_CHARSET;
server->persistpoolmax = config_get_integer(params, CN_PERSISTPOOLMAX);
server->persistmaxtime = config_get_integer(params, CN_PERSISTMAXTIME);
server->proxy_protocol = config_get_bool(params, CN_PROXY_PROTOCOL);
// Set last event to server_up as the server is in Running state on startup
server->parameters = NULL;
spinlock_init(&server->lock);
server->is_active = true;
server->auth_instance = auth_instance;
server->server_ssl = ssl;
server->persistent = persistent;
server->charset = SERVER_DEFAULT_CHARSET;
memset(&server->stats, 0, sizeof(server->stats));
server->persistmax = 0;
server->last_event = SERVER_UP_EVENT;
server->triggered_at = 0;
// Log all warnings once
server->active_event = false;
server->status = SERVER_RUNNING;
server->maint_request = MAINTENANCE_NO_CHANGE;
server->version_string[0] = '\0';
server->version = 0;
server->server_type = SERVER_TYPE_MARIADB;
server->node_id = -1;
server->rlag = MAX_RLAG_UNDEFINED;
server->node_ts = 0;
server->master_id = -1;
server->master_err_is_logged = false;
server->warn_ssl_not_enabled = true;
server->disk_space_threshold = NULL;
if (*monuser && *monpw)
@ -187,10 +195,9 @@ SERVER* server_alloc(const char *name, MXS_CONFIG_PARAMETER* params)
}
}
spinlock_acquire(&server_spin);
server->next = allServers;
allServers = server;
spinlock_release(&server_spin);
Guard guard(server_lock);
// This keeps the order of the servers the same as in 2.2
all_servers.push_front(server);
return server;
}
@ -202,50 +209,36 @@ SERVER* server_alloc(const char *name, MXS_CONFIG_PARAMETER* params)
* @param server The service to deallocate
* @return Returns true if the server was freed
*/
int
server_free(SERVER *tofreeserver)
void server_free(Server* server)
{
SERVER *server;
ss_dassert(server);
/* First of all remove from the linked list */
spinlock_acquire(&server_spin);
if (allServers == tofreeserver)
{
allServers = tofreeserver->next;
Guard guard(server_lock);
auto it = std::find(all_servers.begin(), all_servers.end(), server);
ss_dassert(it != all_servers.end());
all_servers.erase(it);
}
else
{
server = allServers;
while (server && server->next != tofreeserver)
{
server = server->next;
}
if (server)
{
server->next = tofreeserver->next;
}
}
spinlock_release(&server_spin);
/* Clean up session and free the memory */
MXS_FREE(tofreeserver->protocol);
MXS_FREE(tofreeserver->name);
MXS_FREE(tofreeserver->authenticator);
server_parameter_free(tofreeserver->parameters);
MXS_FREE(server->protocol);
MXS_FREE(server->name);
MXS_FREE(server->authenticator);
server_parameter_free(server->parameters);
if (tofreeserver->persistent)
if (server->persistent)
{
int nthr = config_threadcount();
for (int i = 0; i < nthr; i++)
{
dcb_persistent_clean_count(tofreeserver->persistent[i], i, true);
dcb_persistent_clean_count(server->persistent[i], i, true);
}
MXS_FREE(tofreeserver->persistent);
MXS_FREE(server->persistent);
}
delete tofreeserver->disk_space_threshold;
MXS_FREE(tofreeserver);
return 1;
delete server->disk_space_threshold;
delete server;
}
/**
@ -316,16 +309,6 @@ DCB* server_get_persistent(SERVER *server, const char *user, const char* ip, con
return NULL;
}
static inline SERVER* next_active_server(SERVER *server)
{
while (server && !server->is_active)
{
server = server->next;
}
return server;
}
/**
* @brief Find a server with the specified name
*
@ -334,21 +317,17 @@ static inline SERVER* next_active_server(SERVER *server)
*/
SERVER * server_find_by_unique_name(const char *name)
{
spinlock_acquire(&server_spin);
SERVER *server = next_active_server(allServers);
Guard guard(server_lock);
while (server)
for (Server* server : all_servers)
{
if (server->name && strcmp(server->name, name) == 0)
if (server->is_active && strcmp(server->name, name) == 0)
{
break;
return server;
}
server = next_active_server(server->next);
}
spinlock_release(&server_spin);
return server;
return nullptr;
}
/**
@ -399,24 +378,19 @@ int server_find_by_unique_names(char **server_names, int size, SERVER*** output)
* @param port The server port
* @return The server or NULL if not found
*/
SERVER *
server_find(const char *servname, unsigned short port)
SERVER* server_find(const char *servname, unsigned short port)
{
spinlock_acquire(&server_spin);
SERVER *server = next_active_server(allServers);
Guard guard(server_lock);
while (server)
for (Server* server : all_servers)
{
if (strcmp(server->address, servname) == 0 && server->port == port)
if (server->is_active && strcmp(server->address, servname) == 0 && server->port == port)
{
break;
return server;
}
server = next_active_server(server->next);
}
spinlock_release(&server_spin);
return server;
return nullptr;
}
/**
@ -446,16 +420,15 @@ printServer(const SERVER *server)
void
printAllServers()
{
spinlock_acquire(&server_spin);
SERVER *server = next_active_server(allServers);
Guard guard(server_lock);
while (server)
for (Server* server : all_servers)
{
printServer(server);
server = next_active_server(server->next);
if (server->is_active)
{
printServer(server);
}
}
spinlock_release(&server_spin);
}
/**
@ -467,16 +440,15 @@ printAllServers()
void
dprintAllServers(DCB *dcb)
{
spinlock_acquire(&server_spin);
SERVER *server = next_active_server(allServers);
Guard guard(server_lock);
while (server)
for (Server* server : all_servers)
{
dprintServer(dcb, server);
server = next_active_server(server->next);
if (server->is_active)
{
dprintServer(dcb, server);
}
}
spinlock_release(&server_spin);
}
/**
@ -485,11 +457,11 @@ dprintAllServers(DCB *dcb)
void
dprintAllServersJson(DCB *dcb)
{
json_t* all_servers = server_list_to_json("");
char* dump = json_dumps(all_servers, JSON_INDENT(4));
json_t* all_servers_json = server_list_to_json("");
char* dump = json_dumps(all_servers_json, JSON_INDENT(4));
dcb_printf(dcb, "%s", dump);
MXS_FREE(dump);
json_decref(all_servers);
json_decref(all_servers_json);
}
/**
@ -624,19 +596,6 @@ dprintServer(DCB *dcb, const SERVER *server)
}
}
/**
* Display an entry from the spinlock statistics data
*
* @param dcb The DCB to print to
* @param desc Description of the statistic
* @param value The statistic value
*/
static void
spin_reporter(void *dcb, char *desc, int value)
{
dcb_printf((DCB *)dcb, "\t\t%-40s %d\n", desc, value);
}
/**
* Diagnostic to print number of DCBs in persistent pool for a server
*
@ -656,36 +615,35 @@ dprintPersistentDCBs(DCB *pdcb, const SERVER *server)
void
dListServers(DCB *dcb)
{
spinlock_acquire(&server_spin);
SERVER *server = next_active_server(allServers);
bool have_servers = false;
if (server)
Guard guard(server_lock);
bool have_servers = std::any_of(all_servers.begin(), all_servers.end(), [](Server* s)
{
return s->is_active;
});
if (have_servers)
{
have_servers = true;
dcb_printf(dcb, "Servers.\n");
dcb_printf(dcb, "-------------------+-----------------+-------+-------------+--------------------\n");
dcb_printf(dcb, "%-18s | %-15s | Port | Connections | %-20s\n",
"Server", "Address", "Status");
dcb_printf(dcb, "-------------------+-----------------+-------+-------------+--------------------\n");
}
while (server)
{
char *stat = server_status(server);
dcb_printf(dcb, "%-18s | %-15s | %5d | %11d | %s\n",
server->name, server->address,
server->port,
server->stats.n_current, stat);
MXS_FREE(stat);
server = next_active_server(server->next);
}
for (Server* server : all_servers)
{
if (server->is_active)
{
char *stat = server_status(server);
dcb_printf(dcb, "%-18s | %-15s | %5d | %11d | %s\n",
server->name, server->address,
server->port,
server->stats.n_current, stat);
MXS_FREE(stat);
}
}
if (have_servers)
{
dcb_printf(dcb, "-------------------+-----------------+-------+-------------+--------------------\n");
}
spinlock_release(&server_spin);
}
/**
@ -1032,20 +990,19 @@ size_t server_get_parameter(const SERVER *server, const char *name, char* out, s
std::unique_ptr<ResultSet> serverGetList()
{
std::unique_ptr<ResultSet> set = ResultSet::create({"Server", "Address", "Port", "Connections", "Status"});
spinlock_acquire(&server_spin);
Guard guard(server_lock);
for (SERVER* server = allServers; server; server = server->next)
for (Server* server : all_servers)
{
if (server_is_active(server))
{
char* stat = server_status(server);
set->add_row({server->name, server->address, std::to_string(server->port),
std::to_string(server->stats.n_current), stat});
std::to_string(server->stats.n_current), stat});
MXS_FREE(stat);
}
}
spinlock_release(&server_spin);
return set;
}
@ -1059,12 +1016,12 @@ std::unique_ptr<ResultSet> serverGetList()
void
server_update_address(SERVER *server, const char *address)
{
spinlock_acquire(&server_spin);
Guard guard(server_lock);
if (server && address)
{
strcpy(server->address, address);
}
spinlock_release(&server_spin);
}
/*
@ -1077,12 +1034,12 @@ server_update_address(SERVER *server, const char *address)
void
server_update_port(SERVER *server, unsigned short port)
{
spinlock_acquire(&server_spin);
Guard guard(server_lock);
if (server && port > 0)
{
server->port = port;
}
spinlock_release(&server_spin);
}
static struct
@ -1279,25 +1236,23 @@ bool server_serialize(const SERVER *server)
SERVER* server_repurpose_destroyed(const char *name, const char *protocol, const char *authenticator,
const char *address, const char *port)
{
spinlock_acquire(&server_spin);
SERVER *server = allServers;
while (server)
Guard guard(server_lock);
for (Server* server : all_servers)
{
CHK_SERVER(server);
if (strcmp(server->name, name) == 0 &&
if (!server->is_active &&
strcmp(server->name, name) == 0 &&
strcmp(server->protocol, protocol) == 0 &&
strcmp(server->authenticator, authenticator) == 0)
{
snprintf(server->address, sizeof(server->address), "%s", address);
server->port = atoi(port);
server->is_active = true;
break;
return server;
}
server = server->next;
}
spinlock_release(&server_spin);
return server;
return nullptr;
}
/**
* Set a status bit in the server under a lock. This ensures synchronization
@ -1547,10 +1502,9 @@ json_t* server_to_json(const SERVER* server, const char* host)
json_t* server_list_to_json(const char* host)
{
json_t* data = json_array();
Guard guard(server_lock);
spinlock_acquire(&server_spin);
for (SERVER* server = allServers; server; server = server->next)
for (Server* server : all_servers)
{
if (server_is_active(server))
{
@ -1558,8 +1512,6 @@ json_t* server_list_to_json(const char* host)
}
}
spinlock_release(&server_spin);
return mxs_json_resource(host, MXS_JSON_API_SERVERS, data);
}