Develop merge

Develop merge
This commit is contained in:
MassimilianoPinto
2016-09-08 08:53:32 +02:00
25 changed files with 578 additions and 455 deletions

View File

@ -89,4 +89,4 @@ The source code of MaxScale is tagged at GitHub with a tag, which is identical
with the version of MaxScale. For instance, the tag of version X.Y.Z of MaxScale with the version of MaxScale. For instance, the tag of version X.Y.Z of MaxScale
is X.Y.Z. Further, *master* always refers to the latest released non-beta version. is X.Y.Z. Further, *master* always refers to the latest released non-beta version.
The source code is available [here](https://github.com/mariadb-corporation/maxscale-bsl). The source code is available [here](https://github.com/mariadb-corporation/MaxScale).

View File

@ -143,7 +143,7 @@ retblock:
* be allocated. * be allocated.
*/ */
GWBUF * GWBUF *
gwbuf_alloc_and_load(unsigned int size, void *data) gwbuf_alloc_and_load(unsigned int size, const void *data)
{ {
GWBUF *rval; GWBUF *rval;
if ((rval = gwbuf_alloc(size)) != NULL) if ((rval = gwbuf_alloc(size)) != NULL)

View File

@ -249,40 +249,6 @@ load_mysql_users(SERV_LISTENER *listener)
return get_users(listener, listener->users); return get_users(listener, listener->users);
} }
/**
* Reload the user/passwd form mysql.user table into the service users' hashtable
* environment.
*
* @param service The current service
* @return -1 on any error or the number of users inserted (0 means no users at all)
*/
int
reload_mysql_users(SERV_LISTENER *listener)
{
int i;
USERS *newusers, *oldusers;
HASHTABLE *oldresources;
if ((newusers = mysql_users_alloc()) == NULL)
{
return 0;
}
spinlock_acquire(&listener->lock);
oldresources = listener->resources;
i = get_users(listener, newusers);
oldusers = listener->users;
listener->users = newusers;
spinlock_release(&listener->lock);
users_free(oldusers);
resource_free(oldresources);
return i;
}
/** /**
* Replace the user/passwd form mysql.user table into the service users' hashtable * Replace the user/passwd form mysql.user table into the service users' hashtable
* environment. * environment.
@ -294,33 +260,53 @@ reload_mysql_users(SERV_LISTENER *listener)
int int
replace_mysql_users(SERV_LISTENER *listener) replace_mysql_users(SERV_LISTENER *listener)
{ {
int i; USERS *newusers = mysql_users_alloc();
USERS *newusers, *oldusers;
HASHTABLE *oldresources;
if ((newusers = mysql_users_alloc()) == NULL) if (newusers == NULL)
{ {
return -1; return -1;
} }
spinlock_acquire(&listener->lock); spinlock_acquire(&listener->lock);
oldresources = listener->resources;
/* load db users ad db grants */ /** TODO: Make the listener resource a part of the USERS struct */
i = get_users(listener, newusers); HASHTABLE *oldresources = listener->resources;
/* load users and grants from the backend database */
int i = get_users(listener, newusers);
if (i <= 0) if (i <= 0)
{ {
users_free(newusers); /** Failed to load users */
/* restore resources */ if (listener->users)
listener->resources = oldresources; {
/* Restore old users and resources */
users_free(newusers);
listener->resources = oldresources;
}
else
{
/* No users allocated, use the empty new one */
listener->users = newusers;
}
spinlock_release(&listener->lock); spinlock_release(&listener->lock);
return i; return i;
} }
oldusers = listener->users; USERS *oldusers = listener->users;
/* digest compare */ /**
* TODO: Comparing the checksum after loading users is not necessary. We
* have already queried the server, allocated memory and done the processing
* so comparing if a change was made is pointless since the end result is
* always the same. We end up with either the same users or a new set of
* users. If the new users would always be taken into use, we'd avoid
* the costly task of calculating the diff.
*
* An improvement to the diff calculation would be to push the calculation
* to the backend server. This way the bandwidth usage would be minimized
* and the backend server would tell us if we need to query for more data.
*/
if (oldusers != NULL && memcmp(oldusers->cksum, newusers->cksum, if (oldusers != NULL && memcmp(oldusers->cksum, newusers->cksum,
SHA_DIGEST_LENGTH) == 0) SHA_DIGEST_LENGTH) == 0)
{ {

View File

@ -523,8 +523,24 @@ GWBUF* modutil_get_next_MySQL_packet(GWBUF** p_readbuf)
goto return_packetbuf; goto return_packetbuf;
} }
totalbuflen = gwbuf_length(readbuf); totalbuflen = gwbuf_length(readbuf);
data = (uint8_t *)GWBUF_DATA((readbuf)); if (totalbuflen < MYSQL_HEADER_LEN)
packetlen = MYSQL_GET_PACKET_LEN(data) + 4; {
packetbuf = NULL;
goto return_packetbuf;
}
if (GWBUF_LENGTH(readbuf) >= 3) // The length is in the 3 first bytes.
{
data = (uint8_t *)GWBUF_DATA((readbuf));
packetlen = MYSQL_GET_PACKET_LEN(data) + 4;
}
else
{
// The header is split between two GWBUFs.
uint8_t length[3];
gwbuf_copy_data(readbuf, 0, 3, length);
packetlen = MYSQL_GET_PACKET_LEN(length) + 4;
}
/** packet is incomplete */ /** packet is incomplete */
if (packetlen > totalbuflen) if (packetlen > totalbuflen)

View File

@ -11,6 +11,7 @@
* Public License. * Public License.
*/ */
#include <inttypes.h>
#include <stdio.h> #include <stdio.h>
#include <string.h> #include <string.h>
#include <unistd.h> #include <unistd.h>
@ -164,11 +165,11 @@ static struct
ts_stats_t *n_pollev; /*< Number of polls returning events */ ts_stats_t *n_pollev; /*< Number of polls returning events */
ts_stats_t *n_nbpollev; /*< Number of polls returning events */ ts_stats_t *n_nbpollev; /*< Number of polls returning events */
ts_stats_t *n_nothreads; /*< Number of times no threads are polling */ ts_stats_t *n_nothreads; /*< Number of times no threads are polling */
int n_fds[MAXNFDS]; /*< Number of wakeups with particular n_fds value */ int32_t n_fds[MAXNFDS]; /*< Number of wakeups with particular n_fds value */
int evq_length; /*< Event queue length */ int32_t evq_length; /*< Event queue length */
int evq_pending; /*< Number of pending descriptors in event queue */ int32_t evq_pending; /*< Number of pending descriptors in event queue */
int evq_max; /*< Maximum event queue length */ int32_t evq_max; /*< Maximum event queue length */
int wake_evqpending; /*< Woken from epoll_wait with pending events in queue */ int32_t wake_evqpending; /*< Woken from epoll_wait with pending events in queue */
ts_stats_t *blockingpolls; /*< Number of epoll_waits with a timeout specified */ ts_stats_t *blockingpolls; /*< Number of epoll_waits with a timeout specified */
} pollStats; } pollStats;
@ -178,10 +179,10 @@ static struct
*/ */
static struct static struct
{ {
unsigned int qtimes[N_QUEUE_TIMES + 1]; uint32_t qtimes[N_QUEUE_TIMES + 1];
unsigned int exectimes[N_QUEUE_TIMES + 1]; uint32_t exectimes[N_QUEUE_TIMES + 1];
unsigned long maxqtime; uint64_t maxqtime;
unsigned long maxexectime; uint64_t maxexectime;
} queueStats; } queueStats;
/** /**
@ -1258,42 +1259,42 @@ dprintPollStats(DCB *dcb)
int i; int i;
dcb_printf(dcb, "\nPoll Statistics.\n\n"); dcb_printf(dcb, "\nPoll Statistics.\n\n");
dcb_printf(dcb, "No. of epoll cycles: %d\n", dcb_printf(dcb, "No. of epoll cycles: %" PRId64 "\n",
ts_stats_sum(pollStats.n_polls)); ts_stats_sum(pollStats.n_polls));
dcb_printf(dcb, "No. of epoll cycles with wait: %d\n", dcb_printf(dcb, "No. of epoll cycles with wait: %" PRId64 "\n",
ts_stats_sum(pollStats.blockingpolls)); ts_stats_sum(pollStats.blockingpolls));
dcb_printf(dcb, "No. of epoll calls returning events: %d\n", dcb_printf(dcb, "No. of epoll calls returning events: %" PRId64 "\n",
ts_stats_sum(pollStats.n_pollev)); ts_stats_sum(pollStats.n_pollev));
dcb_printf(dcb, "No. of non-blocking calls returning events: %d\n", dcb_printf(dcb, "No. of non-blocking calls returning events: %" PRId64 "\n",
ts_stats_sum(pollStats.n_nbpollev)); ts_stats_sum(pollStats.n_nbpollev));
dcb_printf(dcb, "No. of read events: %d\n", dcb_printf(dcb, "No. of read events: %" PRId64 "\n",
ts_stats_sum(pollStats.n_read)); ts_stats_sum(pollStats.n_read));
dcb_printf(dcb, "No. of write events: %d\n", dcb_printf(dcb, "No. of write events: %" PRId64 "\n",
ts_stats_sum(pollStats.n_write)); ts_stats_sum(pollStats.n_write));
dcb_printf(dcb, "No. of error events: %d\n", dcb_printf(dcb, "No. of error events: %" PRId64 "\n",
ts_stats_sum(pollStats.n_error)); ts_stats_sum(pollStats.n_error));
dcb_printf(dcb, "No. of hangup events: %d\n", dcb_printf(dcb, "No. of hangup events: %" PRId64 "\n",
ts_stats_sum(pollStats.n_hup)); ts_stats_sum(pollStats.n_hup));
dcb_printf(dcb, "No. of accept events: %d\n", dcb_printf(dcb, "No. of accept events: %" PRId64 "\n",
ts_stats_sum(pollStats.n_accept)); ts_stats_sum(pollStats.n_accept));
dcb_printf(dcb, "No. of times no threads polling: %d\n", dcb_printf(dcb, "No. of times no threads polling: %" PRId64 "\n",
ts_stats_sum(pollStats.n_nothreads)); ts_stats_sum(pollStats.n_nothreads));
dcb_printf(dcb, "Current event queue length: %d\n", dcb_printf(dcb, "Current event queue length: %" PRId32 "\n",
pollStats.evq_length); pollStats.evq_length);
dcb_printf(dcb, "Maximum event queue length: %d\n", dcb_printf(dcb, "Maximum event queue length: %" PRId32 "\n",
pollStats.evq_max); pollStats.evq_max);
dcb_printf(dcb, "No. of DCBs with pending events: %d\n", dcb_printf(dcb, "No. of DCBs with pending events: %" PRId32 "\n",
pollStats.evq_pending); pollStats.evq_pending);
dcb_printf(dcb, "No. of wakeups with pending queue: %d\n", dcb_printf(dcb, "No. of wakeups with pending queue: %" PRId32 "\n",
pollStats.wake_evqpending); pollStats.wake_evqpending);
dcb_printf(dcb, "No of poll completions with descriptors\n"); dcb_printf(dcb, "No of poll completions with descriptors\n");
dcb_printf(dcb, "\tNo. of descriptors\tNo. of poll completions.\n"); dcb_printf(dcb, "\tNo. of descriptors\tNo. of poll completions.\n");
for (i = 0; i < MAXNFDS - 1; i++) for (i = 0; i < MAXNFDS - 1; i++)
{ {
dcb_printf(dcb, "\t%2d\t\t\t%d\n", i + 1, pollStats.n_fds[i]); dcb_printf(dcb, "\t%2d\t\t\t%" PRId32 "\n", i + 1, pollStats.n_fds[i]);
} }
dcb_printf(dcb, "\t>= %d\t\t\t%d\n", MAXNFDS, dcb_printf(dcb, "\t>= %d\t\t\t%" PRId32 "\n", MAXNFDS,
pollStats.n_fds[MAXNFDS - 1]); pollStats.n_fds[MAXNFDS - 1]);
#if SPINLOCK_PROFILE #if SPINLOCK_PROFILE

View File

@ -221,8 +221,12 @@ service_isvalid(SERVICE *service)
static int static int
serviceStartPort(SERVICE *service, SERV_LISTENER *port) serviceStartPort(SERVICE *service, SERV_LISTENER *port)
{ {
const size_t ANY_IPV4_ADDRESS_LEN = 7; // strlen("0:0:0:0");
int listeners = 0; int listeners = 0;
char config_bind[40]; size_t config_bind_len =
(port->address ? strlen(port->address) : ANY_IPV4_ADDRESS_LEN) + 1 + UINTLEN(port->port);
char config_bind[config_bind_len + 1]; // +1 for NULL
GWPROTOCOL *funcs; GWPROTOCOL *funcs;
if (service == NULL || service->router == NULL || service->router_instance == NULL) if (service == NULL || service->router == NULL || service->router_instance == NULL)
@ -247,97 +251,6 @@ serviceStartPort(SERVICE *service, SERV_LISTENER *port)
listener_init_SSL(port->ssl); listener_init_SSL(port->ssl);
} }
/**
* The following code should be located inside the authenticator modules.
*
* The authentication of users is done against the data being gathered here
* but the actual authentication happens inside the authenticator module.
* Since not all authenticators use the same data for authentication, it
* should be up to the authenticator to decide where the user data is retrieved
* and how it is stored.
*
* One way to do it would be to add a @c dcb->authfunc.loadusers(dcb)
* entry point which loads the users and a @c dcb->authfunc.freeusers(dcb)
* which in turn frees the users. It would also make sense to have a
* @c dcb->authfunc.reloadusers(dcb) entry point which would allow for a more
* optimized user reloading process instead of freeing and loading the users
* again.
*/
if (strcmp(port->protocol, "MySQLClient") == 0)
{
int loaded;
if (port->users == NULL)
{
/*
* Allocate specific data for MySQL users
* including hosts and db names
*/
port->users = mysql_users_alloc();
const char* cachedir = get_cachedir();
if ((loaded = load_mysql_users(port)) < 0)
{
MXS_ERROR("Unable to load users for service %s listening at %s:%d.",
service->name, port->address ? port->address : "0.0.0.0",
port->port);
/* Try loading authentication data from file cache */
char path[PATH_MAX];
sprintf(path, "%s/%s/%s/%s/%s", cachedir, service->name, port->name,
DBUSERS_DIR, DBUSERS_FILE);
if ((loaded = dbusers_load(port->users, path)) == -1)
{
MXS_ERROR("Failed to load cached users from '%s'.", path);
users_free(port->users);
dcb_close(port->listener);
goto retblock;
}
else
{
MXS_WARNING("Using cached credential information.");
}
}
else
{
/* Save authentication data to file cache */
char path[PATH_MAX];
sprintf(path, "%s/%s/%s/%s/", cachedir, service->name, port->name, DBUSERS_DIR);
if (mxs_mkdir_all(path, 0777))
{
strcat(path, DBUSERS_FILE);
dbusers_save(port->users, path);
}
}
if (loaded == 0)
{
MXS_ERROR("[%s]: failed to load any user information. Authentication"
" will probably fail as a result.", service->name);
}
/* At service start last update is set to USERS_REFRESH_TIME seconds earlier.
* This way MaxScale could try reloading users' just after startup
*/
service->rate_limit.last = time(NULL) - USERS_REFRESH_TIME;
service->rate_limit.nloads = 1;
MXS_NOTICE("Loaded %d MySQL Users for service [%s].",
loaded, service->name);
}
}
else
{
if (port->users == NULL)
{
/* Generic users table */
port->users = users_alloc();
}
}
if ((funcs = (GWPROTOCOL *)load_module(port->protocol, MODULE_PROTOCOL)) == NULL) if ((funcs = (GWPROTOCOL *)load_module(port->protocol, MODULE_PROTOCOL)) == NULL)
{ {
dcb_close(port->listener); dcb_close(port->listener);
@ -348,8 +261,33 @@ serviceStartPort(SERVICE *service, SERV_LISTENER *port)
service->name); service->name);
goto retblock; goto retblock;
} }
memcpy(&(port->listener->func), funcs, sizeof(GWPROTOCOL)); memcpy(&(port->listener->func), funcs, sizeof(GWPROTOCOL));
const char *authenticator_name = "NullAuth";
if (port->authenticator)
{
authenticator_name = port->authenticator;
}
else if (port->listener->func.auth_default)
{
authenticator_name = port->listener->func.auth_default();
}
GWAUTHENTICATOR *authfuncs = (GWAUTHENTICATOR *)load_module(authenticator_name, MODULE_AUTHENTICATOR);
if (authfuncs == NULL)
{
MXS_ERROR("Failed to load authenticator module '%s' for listener '%s'",
authenticator_name, port->name);
dcb_close(port->listener);
port->listener = NULL;
return 0;
}
memcpy(&port->listener->authfunc, authfuncs, sizeof(GWAUTHENTICATOR));
if (port->address) if (port->address)
{ {
sprintf(config_bind, "%s:%d", port->address, port->port); sprintf(config_bind, "%s:%d", port->address, port->port);
@ -359,6 +297,22 @@ serviceStartPort(SERVICE *service, SERV_LISTENER *port)
sprintf(config_bind, "0.0.0.0:%d", port->port); sprintf(config_bind, "0.0.0.0:%d", port->port);
} }
/** Load the authentication users before before starting the listener */
if (port->listener->authfunc.loadusers &&
(service->router->getCapabilities() & RCAP_TYPE_NO_USERS_INIT) == 0 &&
port->listener->authfunc.loadusers(port) != AUTH_LOADUSERS_OK)
{
MXS_ERROR("[%s] Failed to load users for listener '%s', authentication might not work.",
service->name, port->name);
}
/**
* At service start last update is set to USERS_REFRESH_TIME seconds earlier. This way MaxScale
* could try reloading users just after startup.
*/
service->rate_limit.last = time(NULL) - USERS_REFRESH_TIME;
service->rate_limit.nloads = 1;
if (port->listener->func.listen(port->listener, config_bind)) if (port->listener->func.listen(port->listener, config_bind))
{ {
port->listener->session = session_alloc(service, port->listener); port->listener->session = session_alloc(service, port->listener);
@ -1129,7 +1083,7 @@ serviceSetFilters(SERVICE *service, char *filters)
} }
else else
{ {
MXS_ERROR("Unable to find filter '%s' for service '%s'\n", MXS_ERROR("Unable to find filter '%s' for service '%s'",
filter_name, service->name); filter_name, service->name);
rval = false; rval = false;
break; break;
@ -1474,59 +1428,45 @@ service_update(SERVICE *service, char *router, char *user, char *auth)
int service_refresh_users(SERVICE *service) int service_refresh_users(SERVICE *service)
{ {
int ret = 1; int ret = 1;
/* check for another running getUsers request */
if (!spinlock_acquire_nowait(&service->users_table_spin)) if (spinlock_acquire_nowait(&service->users_table_spin))
{ {
MXS_DEBUG("%s: [service_refresh_users] failed to get get lock for " time_t now = time(NULL);
"loading new users' table: another thread is loading users",
service->name);
return 1; /* Check if refresh rate limit has been exceeded */
} if ((now < service->rate_limit.last + USERS_REFRESH_TIME) ||
(service->rate_limit.nloads > USERS_REFRESH_MAX_PER_TIME))
/* check if refresh rate limit has exceeded */
if ((time(NULL) < (service->rate_limit.last + USERS_REFRESH_TIME)) ||
(service->rate_limit.nloads > USERS_REFRESH_MAX_PER_TIME))
{
spinlock_release(&service->users_table_spin);
MXS_ERROR("%s: Refresh rate limit exceeded for load of users' table.",
service->name);
return 1;
}
service->rate_limit.nloads++;
/* update time and counter */
if (service->rate_limit.nloads > USERS_REFRESH_MAX_PER_TIME)
{
service->rate_limit.nloads = 1;
service->rate_limit.last = time(NULL);
}
SERV_LISTENER *port = service->ports;
while (port)
{
if (replace_mysql_users(port) < 0)
{ {
ret = -1; MXS_ERROR("[%s] Refresh rate limit exceeded for load of users' table.", service->name);
break;
} }
ret++; else
port = port->next; {
service->rate_limit.nloads++;
/** If we have reached the limit on users refreshes, reset refresh time and count */
if (service->rate_limit.nloads > USERS_REFRESH_MAX_PER_TIME)
{
service->rate_limit.nloads = 1;
service->rate_limit.last = now;
}
ret = 0;
for (SERV_LISTENER *port = service->ports; port; port = port->next)
{
if (port->listener->authfunc.loadusers(port) != AUTH_LOADUSERS_OK)
{
MXS_ERROR("[%s] Failed to load users for listener '%s', authentication might not work.",
service->name, port->name);
ret = 1;
}
}
}
spinlock_release(&service->users_table_spin);
} }
/* remove lock */ return ret;
spinlock_release(&service->users_table_spin);
if (ret >= 0)
{
return 0;
}
else
{
return 1;
}
} }
bool service_set_param_value(SERVICE* service, bool service_set_param_value(SERVICE* service,

View File

@ -58,7 +58,7 @@ void ts_stats_end()
ts_stats_t ts_stats_alloc() ts_stats_t ts_stats_alloc()
{ {
ss_dassert(stats_initialized); ss_dassert(stats_initialized);
return MXS_CALLOC(thread_count, sizeof(int)); return MXS_CALLOC(thread_count, sizeof(int64_t));
} }
/** /**
@ -80,13 +80,13 @@ void ts_stats_free(ts_stats_t stats)
* @param stats Statistics to read * @param stats Statistics to read
* @return Value of statistics * @return Value of statistics
*/ */
int ts_stats_sum(ts_stats_t stats) int64_t ts_stats_sum(ts_stats_t stats)
{ {
ss_dassert(stats_initialized); ss_dassert(stats_initialized);
int sum = 0; int64_t sum = 0;
for (int i = 0; i < thread_count; i++) for (int i = 0; i < thread_count; i++)
{ {
sum += ((int*)stats)[i]; sum += ((int64_t*)stats)[i];
} }
return sum; return sum;
} }

View File

@ -107,18 +107,57 @@ static char ok[] =
* CREATE OR REPLACE TABLE test.t1 (id int); * CREATE OR REPLACE TABLE test.t1 (id int);
* INSERT INTO test.t1 VALUES (3000); * INSERT INTO test.t1 VALUES (3000);
* SELECT * FROM test.t1; */ * SELECT * FROM test.t1; */
static char resultset[] = static const char resultset[] =
{ {
0x01, 0x00, 0x00, 0x01, 0x01, 0x22, 0x00, 0x00, 0x02, 0x03, 0x64, 0x65, 0x66, 0x04, 0x74, 0x65, /* Packet 1 */
0x73, 0x74, 0x02, 0x74, 0x31, 0x02, 0x74, 0x31, 0x02, 0x69, 0x64, 0x02, 0x69, 0x64, 0x0c, 0x3f, 0x01, 0x00, 0x00, 0x01, 0x01,
0x00, 0x0b, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x05, 0x00, 0x00, 0x03, 0xfe, /* Packet 2 */
0x00, 0x00, 0x22, 0x00, 0x05, 0x00, 0x00, 0x04, 0x04, 0x33, 0x30, 0x30, 0x30, 0x05, 0x00, 0x00, 0x22, 0x00, 0x00, 0x02, 0x03, 0x64, 0x65, 0x66, 0x04, 0x74, 0x65, 0x73, 0x74, 0x02, 0x74, 0x31,
0x05, 0xfe, 0x00, 0x00, 0x22, 0x00 0x02, 0x74, 0x31, 0x02, 0x69, 0x64, 0x02, 0x69, 0x64, 0x0c, 0x3f,
0x00, 0x0b, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00,
/* Packet 3 */
0x05, 0x00, 0x00, 0x03, 0xfe, 0x00, 0x00, 0x22, 0x00,
/* Packet 4 */
0x05, 0x00, 0x00, 0x04, 0x04, 0x33, 0x30, 0x30, 0x30,
/* Packet 5 */
0x05, 0x00, 0x00, 0x05, 0xfe, 0x00, 0x00, 0x22, 0x00
}; };
#define PACKET_HDR_LEN 4
void test_single_sql_packet() #define PACKET_1_IDX 0
#define PACKET_1_LEN (PACKET_HDR_LEN + 0x01) // resultset[PACKET_1_IDX])
#define PACKET_2_IDX (PACKET_1_IDX + PACKET_1_LEN)
#define PACKET_2_LEN (PACKET_HDR_LEN + 0x22) // resultset[PACKET_2_IDX]);
#define PACKET_3_IDX (PACKET_2_IDX + PACKET_2_LEN)
#define PACKET_3_LEN (PACKET_HDR_LEN + 0x05) // resultset[PACKET_3_IDX]);
#define PACKET_4_IDX (PACKET_3_IDX + PACKET_3_LEN)
#define PACKET_4_LEN (PACKET_HDR_LEN + 0x05) // resultset[PACKET_4_IDX]);
#define PACKET_5_IDX (PACKET_4_IDX + PACKET_4_LEN)
#define PACKET_5_LEN (PACKET_HDR_LEN + 0x05) // resultset[PACKET_5_IDX]);
struct packet
{ {
int index;
int length;
} packets[] =
{
{ PACKET_1_IDX, PACKET_1_LEN },
{ PACKET_2_IDX, PACKET_2_LEN },
{ PACKET_3_IDX, PACKET_3_LEN },
{ PACKET_4_IDX, PACKET_4_LEN },
{ PACKET_5_IDX, PACKET_5_LEN },
};
#define N_PACKETS (sizeof(packets)/sizeof(packets[0]))
//
// modutil_get_complete_packets
//
void test_single_sql_packet1()
{
printf("%s\n", __func__);
/** Single packet */ /** Single packet */
GWBUF* buffer = gwbuf_alloc_and_load(sizeof(ok), ok); GWBUF* buffer = gwbuf_alloc_and_load(sizeof(ok), ok);
GWBUF* complete = modutil_get_complete_packets(&buffer); GWBUF* complete = modutil_get_complete_packets(&buffer);
@ -144,8 +183,9 @@ void test_single_sql_packet()
ss_info_dassert(gwbuf_length(complete) == sizeof(ok), "Buffer should contain all data"); ss_info_dassert(gwbuf_length(complete) == sizeof(ok), "Buffer should contain all data");
} }
void test_multiple_sql_packets() void test_multiple_sql_packets1()
{ {
printf("%s\n", __func__);
/** All of the data */ /** All of the data */
GWBUF* buffer = gwbuf_alloc_and_load(sizeof(resultset), resultset); GWBUF* buffer = gwbuf_alloc_and_load(sizeof(resultset), resultset);
GWBUF* complete = modutil_get_complete_packets(&buffer); GWBUF* complete = modutil_get_complete_packets(&buffer);
@ -247,6 +287,160 @@ void test_multiple_sql_packets()
ss_info_dassert(memcmp(databuf, resultset, sizeof(resultset)) == 0, "Data should be OK"); ss_info_dassert(memcmp(databuf, resultset, sizeof(resultset)) == 0, "Data should be OK");
} }
//
// modutil_get_next_MySQL_packet
//
void test_single_sql_packet2()
{
printf("%s\n", __func__);
/** Single packet */
GWBUF* buffer;
GWBUF* next;
buffer = gwbuf_alloc_and_load(sizeof(ok), ok);
next = modutil_get_next_MySQL_packet(&buffer);
ss_info_dassert(buffer == NULL, "Old buffer should be NULL");
ss_info_dassert(next, "Next packet buffer should not be NULL");
ss_info_dassert(gwbuf_length(next) == sizeof(ok), "Next packet buffer should contain enough data");
ss_info_dassert(memcmp(GWBUF_DATA(next), ok, GWBUF_LENGTH(next)) == 0,
"Next packet buffer's data should be equal to original data");
gwbuf_free(buffer);
gwbuf_free(next);
/** Partial single packet */
buffer = gwbuf_alloc_and_load(sizeof(ok) - 4, ok);
next = modutil_get_next_MySQL_packet(&buffer);
ss_info_dassert(buffer, "Old buffer should be not NULL");
ss_info_dassert(next == NULL, "Next packet buffer should be NULL");
ss_info_dassert(gwbuf_length(buffer) == sizeof(ok) - 4, "Old buffer should contain right amount of data");
/** Add the missing data */
buffer = gwbuf_append(buffer, gwbuf_alloc_and_load(4, ok + sizeof(ok) - 4));
next = modutil_get_next_MySQL_packet(&buffer);
ss_info_dassert(buffer == NULL, "Old buffer should be NULL");
ss_info_dassert(next, "Next packet buffer should not be NULL");
//To be put back when the current realloc behaviour is replaced with splitting behaviour.
//ss_info_dassert(next->next, "The next packet should be a chain of buffers");
ss_info_dassert(gwbuf_length(next) == sizeof(ok), "Buffer should contain all data");
gwbuf_free(next);
}
void test_multiple_sql_packets2()
{
printf("%s\n", __func__);
/** All of the data */
GWBUF* buffer;
GWBUF* next;
buffer = gwbuf_alloc_and_load(sizeof(resultset), resultset);
// Empty buffer packet by packet.
for (int i = 0; i < N_PACKETS; i++)
{
GWBUF* next = modutil_get_next_MySQL_packet(&buffer);
ss_info_dassert(next, "Next packet buffer should not be NULL");
ss_info_dassert(gwbuf_length(next) == packets[i].length,
"Next packet buffer should contain enough data");
ss_info_dassert(memcmp(GWBUF_DATA(next), &resultset[packets[i].index], GWBUF_LENGTH(next)) == 0,
"Next packet buffer's data should be equal to original data");
gwbuf_free(next);
}
ss_info_dassert(buffer == NULL, "Buffer should be NULL");
size_t len;
// Exactly one packet
len = PACKET_1_LEN;
buffer = gwbuf_alloc_and_load(len, resultset);
next = modutil_get_next_MySQL_packet(&buffer);
ss_info_dassert(buffer == NULL, "Old buffer should be NULL.");
ss_info_dassert(next, "Next should not be NULL.");
ss_info_dassert(GWBUF_LENGTH(next) == PACKET_1_LEN, "Length should match.");
gwbuf_free(next);
// Slightly less than one packet
len = PACKET_1_LEN - 1;
buffer = gwbuf_alloc_and_load(len, resultset);
next = modutil_get_next_MySQL_packet(&buffer);
ss_info_dassert(buffer, "Old buffer should not be NULL.");
ss_info_dassert(next == NULL, "Next should be NULL.");
GWBUF *tail = gwbuf_alloc_and_load(sizeof(resultset) - len, resultset + len);
buffer = gwbuf_append(buffer, tail);
next = modutil_get_next_MySQL_packet(&buffer);
ss_info_dassert(buffer, "Old buffer should not be NULL.");
ss_info_dassert(next, "Next should not be NULL.");
ss_info_dassert(gwbuf_length(next) == PACKET_1_LEN, "Length should match.");
gwbuf_free(buffer);
gwbuf_free(next);
// Slightly more than one packet
len = PACKET_1_LEN + 1;
buffer = gwbuf_alloc_and_load(len, resultset);
next = modutil_get_next_MySQL_packet(&buffer);
ss_info_dassert(buffer, "Old buffer should not be NULL.");
ss_info_dassert(next, "Next should not be NULL.");
ss_info_dassert(GWBUF_LENGTH(next) == PACKET_1_LEN, "Length should match.");
gwbuf_free(next);
next = modutil_get_next_MySQL_packet(&buffer);
ss_info_dassert(buffer, "Old buffer should not be NULL.");
ss_info_dassert(next == NULL, "Next should be NULL.");
tail = gwbuf_alloc_and_load(sizeof(resultset) - len, resultset + len);
buffer = gwbuf_append(buffer, tail);
next = modutil_get_next_MySQL_packet(&buffer);
ss_info_dassert(buffer, "Old buffer should not be NULL.");
ss_info_dassert(next, "Next should not be NULL.");
ss_info_dassert(gwbuf_length(next) == PACKET_2_LEN, "Length should match.");
ss_info_dassert(memcmp(GWBUF_DATA(next), &resultset[PACKET_2_IDX], GWBUF_LENGTH(next)) == 0,
"Next packet buffer's data should be equal to original data");
gwbuf_free(buffer);
gwbuf_free(next);
GWBUF *head;
/** Sliding cutoff of the buffer boundary */
for (size_t i = 0; i < sizeof(resultset); i++)
{
head = gwbuf_alloc_and_load(i, resultset);
tail = gwbuf_alloc_and_load(sizeof(resultset) - i, resultset + i);
head = gwbuf_append(head, tail);
next = modutil_get_next_MySQL_packet(&head);
int headlen = gwbuf_length(head);
int nextlen = next ? gwbuf_length(next) : 0;
ss_info_dassert(headlen + nextlen == sizeof(resultset),
"Both buffers should sum up to sizeof(resutlset) bytes");
uint8_t databuf[sizeof(resultset)];
gwbuf_copy_data(next, 0, nextlen, databuf);
gwbuf_copy_data(head, 0, headlen, databuf + nextlen);
ss_info_dassert(memcmp(databuf, resultset, sizeof(resultset)) == 0, "Data should be OK");
}
/** Fragmented buffer chain */
size_t chunk = 5;
size_t total = 0;
buffer = NULL;
do
{
chunk = chunk + 5 < sizeof(resultset) ? 5 : (chunk + 5) - sizeof(resultset);
buffer = gwbuf_append(buffer, gwbuf_alloc_and_load(chunk, resultset + total));
total += chunk;
}
while (total < sizeof(resultset));
for (int i = 0; i < N_PACKETS; i++)
{
GWBUF* next = modutil_get_next_MySQL_packet(&buffer);
ss_info_dassert(next, "Next packet buffer should not be NULL");
ss_info_dassert(gwbuf_length(next) == packets[i].length,
"Next packet buffer should contain enough data");
next = gwbuf_make_contiguous(next);
ss_info_dassert(memcmp(GWBUF_DATA(next), &resultset[packets[i].index], GWBUF_LENGTH(next)) == 0,
"Next packet buffer's data should be equal to original data");
gwbuf_free(next);
}
ss_info_dassert(buffer == NULL, "Buffer should be NULL");
}
void test_strnchr_esc_mysql() void test_strnchr_esc_mysql()
{ {
char comment1[] = "This will -- fail."; char comment1[] = "This will -- fail.";
@ -390,8 +584,10 @@ int main(int argc, char **argv)
result += test1(); result += test1();
result += test2(); result += test2();
test_single_sql_packet(); test_single_sql_packet1();
test_multiple_sql_packets(); test_single_sql_packet2();
test_multiple_sql_packets1();
test_multiple_sql_packets2();
test_strnchr_esc(); test_strnchr_esc();
test_strnchr_esc_mysql(); test_strnchr_esc_mysql();
test_large_packets(); test_large_packets();

View File

@ -71,17 +71,11 @@ users_alloc()
void void
users_free(USERS *users) users_free(USERS *users)
{ {
if (users == NULL) if (users)
{
MXS_ERROR("[%s:%d]: NULL parameter.", __FUNCTION__, __LINE__);
return;
}
if (users->data)
{ {
hashtable_free(users->data); hashtable_free(users->data);
MXS_FREE(users);
} }
MXS_FREE(users);
} }
/** /**
@ -217,3 +211,18 @@ dcb_usersPrint(DCB *dcb, USERS *users)
} }
dcb_printf(dcb, "\n"); dcb_printf(dcb, "\n");
} }
/**
* @brief Default user loading function
*
* A generic key-value user table is allocated for the service.
*
* @param port Listener configuration
* @return Always AUTH_LOADUSERS_OK
*/
int users_default_loadusers(SERV_LISTENER *port)
{
users_free(port->users);
port->users = users_alloc();
return AUTH_LOADUSERS_OK;
}

View File

@ -183,7 +183,7 @@ typedef struct gwbuf
* Function prototypes for the API to maniplate the buffers * Function prototypes for the API to maniplate the buffers
*/ */
extern GWBUF *gwbuf_alloc(unsigned int size); extern GWBUF *gwbuf_alloc(unsigned int size);
extern GWBUF *gwbuf_alloc_and_load(unsigned int size, void *data); extern GWBUF *gwbuf_alloc_and_load(unsigned int size, const void *data);
extern void gwbuf_free(GWBUF *buf); extern void gwbuf_free(GWBUF *buf);
extern GWBUF *gwbuf_clone(GWBUF *buf); extern GWBUF *gwbuf_clone(GWBUF *buf);
extern GWBUF *gwbuf_append(GWBUF *head, GWBUF *tail); extern GWBUF *gwbuf_append(GWBUF *head, GWBUF *tail);

View File

@ -36,6 +36,7 @@
struct dcb; struct dcb;
struct server; struct server;
struct session; struct session;
struct servlistener;
/** /**
* @verbatim * @verbatim
@ -44,6 +45,8 @@ struct session;
* extract Extract the data from a buffer and place in a structure * extract Extract the data from a buffer and place in a structure
* connectssl Determine whether the connection can support SSL * connectssl Determine whether the connection can support SSL
* authenticate Carry out the authentication * authenticate Carry out the authentication
* free Free extracted data
* loadusers Load or update authenticator user data
* @endverbatim * @endverbatim
* *
* This forms the "module object" for authenticator modules within the gateway. * This forms the "module object" for authenticator modules within the gateway.
@ -56,14 +59,19 @@ typedef struct gw_authenticator
bool (*connectssl)(struct dcb *); bool (*connectssl)(struct dcb *);
int (*authenticate)(struct dcb *); int (*authenticate)(struct dcb *);
void (*free)(struct dcb *); void (*free)(struct dcb *);
int (*loadusers)(struct servlistener *);
} GWAUTHENTICATOR; } GWAUTHENTICATOR;
/** Return values for the loadusers entry point */
#define AUTH_LOADUSERS_OK 0 /**< Users loaded successfully */
#define AUTH_LOADUSERS_ERROR 1 /**< Failed to load users */
/** /**
* The GWAUTHENTICATOR version data. The following should be updated whenever * The GWAUTHENTICATOR version data. The following should be updated whenever
* the GWAUTHENTICATOR structure is changed. See the rules defined in modinfo.h * the GWAUTHENTICATOR structure is changed. See the rules defined in modinfo.h
* that define how these numbers should change. * that define how these numbers should change.
*/ */
#define GWAUTHENTICATOR_VERSION {1, 0, 0} #define GWAUTHENTICATOR_VERSION {1, 1, 0}
#endif /* GW_AUTHENTICATOR_H */ #endif /* GW_AUTHENTICATOR_H */

View File

@ -51,7 +51,7 @@ typedef struct servlistener
struct dcb *listener; /**< The DCB for the listener */ struct dcb *listener; /**< The DCB for the listener */
struct users *users; /**< The user data for this listener */ struct users *users; /**< The user data for this listener */
HASHTABLE *resources; /**< hastable for listener resources, i.e. database names */ HASHTABLE *resources; /**< hastable for listener resources, i.e. database names */
struct service* service; /**< The service which used by this listener */ struct service* service; /**< The service which used by this listener */
SPINLOCK lock; SPINLOCK lock;
struct servlistener *next; /**< Next service protocol */ struct servlistener *next; /**< Next service protocol */
} SERV_LISTENER; } SERV_LISTENER;

View File

@ -92,10 +92,12 @@ typedef struct router_object
*/ */
typedef enum router_capability_t typedef enum router_capability_t
{ {
RCAP_TYPE_UNDEFINED = 0x00, RCAP_TYPE_UNDEFINED = 0x00,
RCAP_TYPE_STMT_INPUT = 0x01, /*< statement per buffer */ RCAP_TYPE_STMT_INPUT = 0x01, /**< Statement per buffer */
RCAP_TYPE_PACKET_INPUT = 0x02, /*< data as it was read from DCB */ RCAP_TYPE_PACKET_INPUT = 0x02, /**< Data as it was read from DCB */
RCAP_TYPE_NO_RSESSION = 0x04 /*< router does not use router sessions */ RCAP_TYPE_NO_RSESSION = 0x04, /**< Router does not use router sessions */
RCAP_TYPE_NO_USERS_INIT = 0x08 /**< Prevent the loading of authenticator
users when the service is started */
} router_capability_t; } router_capability_t;

View File

@ -25,6 +25,8 @@
* @endverbatim * @endverbatim
*/ */
#include <stdint.h>
typedef void* ts_stats_t; typedef void* ts_stats_t;
/** stats_init should be called only once */ /** stats_init should be called only once */
@ -35,7 +37,7 @@ void ts_stats_end();
ts_stats_t ts_stats_alloc(); ts_stats_t ts_stats_alloc();
void ts_stats_free(ts_stats_t stats); void ts_stats_free(ts_stats_t stats);
int ts_stats_sum(ts_stats_t stats); int64_t ts_stats_sum(ts_stats_t stats);
/** /**
* @brief Increment thread statistics by one * @brief Increment thread statistics by one
@ -46,7 +48,7 @@ int ts_stats_sum(ts_stats_t stats);
static void inline static void inline
ts_stats_increment(ts_stats_t stats, int thread_id) ts_stats_increment(ts_stats_t stats, int thread_id)
{ {
((int*)stats)[thread_id]++; ((int64_t*)stats)[thread_id]++;
} }
/** /**
@ -63,7 +65,7 @@ ts_stats_increment(ts_stats_t stats, int thread_id)
static void inline static void inline
ts_stats_set(ts_stats_t stats, int value, int thread_id) ts_stats_set(ts_stats_t stats, int value, int thread_id)
{ {
((int*)stats)[thread_id] = value; ((int64_t*)stats)[thread_id] = value;
} }
#endif #endif

View File

@ -14,6 +14,7 @@
*/ */
#include <hashtable.h> #include <hashtable.h>
#include <dcb.h> #include <dcb.h>
#include <listener.h>
#include <openssl/sha.h> #include <openssl/sha.h>
/** /**
@ -64,6 +65,8 @@ extern int users_delete(USERS *, char *); /**< Delete a user from the us
extern char *users_fetch(USERS *, char *); /**< Fetch the authentication data for a user */ extern char *users_fetch(USERS *, char *); /**< Fetch the authentication data for a user */
extern int users_update(USERS *, char *, char *); /**< Change the password data for a user in extern int users_update(USERS *, char *, char *); /**< Change the password data for a user in
the users table */ the users table */
extern int users_default_loadusers(SERV_LISTENER *port); /**< A generic implementation of the authenticator
* loadusers entry point */
extern void usersPrint(USERS *); /**< Print data about the users loaded */ extern void usersPrint(USERS *); /**< Print data about the users loaded */
extern void dcb_usersPrint(DCB *, USERS *); /**< Print data about the users loaded */ extern void dcb_usersPrint(DCB *, USERS *); /**< Print data about the users loaded */

View File

@ -46,8 +46,7 @@ MODULE_INFO info =
"The CDC client to MaxScale authenticator implementation" "The CDC client to MaxScale authenticator implementation"
}; };
static char *version_str = "V1.0.0"; static char *version_str = "V1.1.0";
static int cdc_load_users_init = 0;
static int cdc_auth_set_protocol_data(DCB *dcb, GWBUF *buf); static int cdc_auth_set_protocol_data(DCB *dcb, GWBUF *buf);
static bool cdc_auth_is_client_ssl_capable(DCB *dcb); static bool cdc_auth_is_client_ssl_capable(DCB *dcb);
@ -55,9 +54,6 @@ static int cdc_auth_authenticate(DCB *dcb);
static void cdc_auth_free_client_data(DCB *dcb); static void cdc_auth_free_client_data(DCB *dcb);
static int cdc_set_service_user(SERV_LISTENER *listener); static int cdc_set_service_user(SERV_LISTENER *listener);
static int cdc_read_users(USERS *users, char *usersfile);
static int cdc_load_users(SERV_LISTENER *listener);
static int cdc_refresh_users(SERV_LISTENER *listener);
static int cdc_replace_users(SERV_LISTENER *listener); static int cdc_replace_users(SERV_LISTENER *listener);
extern char *gw_bin2hex(char *out, const uint8_t *in, unsigned int len); extern char *gw_bin2hex(char *out, const uint8_t *in, unsigned int len);
@ -75,6 +71,7 @@ static GWAUTHENTICATOR MyObject =
cdc_auth_is_client_ssl_capable, /* Check if client supports SSL */ cdc_auth_is_client_ssl_capable, /* Check if client supports SSL */
cdc_auth_authenticate, /* Authenticate user credentials */ cdc_auth_authenticate, /* Authenticate user credentials */
cdc_auth_free_client_data, /* Free the client data held in DCB */ cdc_auth_free_client_data, /* Free the client data held in DCB */
cdc_replace_users
}; };
static int cdc_auth_check( static int cdc_auth_check(
@ -136,43 +133,25 @@ GWAUTHENTICATOR* GetModuleObject()
static int cdc_auth_check(DCB *dcb, CDC_protocol *protocol, char *username, uint8_t *auth_data, static int cdc_auth_check(DCB *dcb, CDC_protocol *protocol, char *username, uint8_t *auth_data,
unsigned int *flags) unsigned int *flags)
{ {
char *user_password; if (dcb->listener->users)
if (!cdc_load_users_init)
{ {
/* Load db users or set service user */ char *user_password = users_fetch(dcb->listener->users, username);
if (cdc_load_users(dcb->listener) < 1)
if (user_password)
{ {
cdc_set_service_user(dcb->listener); /* compute SHA1 of auth_data */
} uint8_t sha1_step1[SHA_DIGEST_LENGTH] = "";
char hex_step1[2 * SHA_DIGEST_LENGTH + 1] = "";
cdc_load_users_init = 1; gw_sha1_str(auth_data, SHA_DIGEST_LENGTH, sha1_step1);
} gw_bin2hex(hex_step1, sha1_step1, SHA_DIGEST_LENGTH);
user_password = users_fetch(dcb->listener->users, username); return memcmp(user_password, hex_step1, SHA_DIGEST_LENGTH) == 0 ?
CDC_STATE_AUTH_OK : CDC_STATE_AUTH_FAILED;
if (!user_password)
{
return CDC_STATE_AUTH_FAILED;
}
else
{
/* compute SHA1 of auth_data */
uint8_t sha1_step1[SHA_DIGEST_LENGTH] = "";
char hex_step1[2 * SHA_DIGEST_LENGTH + 1] = "";
gw_sha1_str(auth_data, SHA_DIGEST_LENGTH, sha1_step1);
gw_bin2hex(hex_step1, sha1_step1, SHA_DIGEST_LENGTH);
if (memcmp(user_password, hex_step1, SHA_DIGEST_LENGTH) == 0)
{
return CDC_STATE_AUTH_OK;
}
else
{
return CDC_STATE_AUTH_FAILED;
} }
} }
return CDC_STATE_AUTH_FAILED;
} }
/** /**
@ -200,10 +179,9 @@ cdc_auth_authenticate(DCB *dcb)
auth_ret = cdc_auth_check(dcb, protocol, client_data->user, client_data->auth_data, client_data->flags); auth_ret = cdc_auth_check(dcb, protocol, client_data->user, client_data->auth_data, client_data->flags);
/* On failed authentication try to reload user table */ /* On failed authentication try to reload users and authenticate again */
if (CDC_STATE_AUTH_OK != auth_ret && 0 == cdc_refresh_users(dcb->listener)) if (CDC_STATE_AUTH_OK != auth_ret && cdc_replace_users(dcb->listener) == AUTH_LOADUSERS_OK)
{ {
/* Call protocol authentication */
auth_ret = cdc_auth_check(dcb, protocol, client_data->user, client_data->auth_data, client_data->flags); auth_ret = cdc_auth_check(dcb, protocol, client_data->user, client_data->auth_data, client_data->flags);
} }
@ -415,48 +393,6 @@ cdc_set_service_user(SERV_LISTENER *listener)
return 0; return 0;
} }
/*
* Load AVRO users into (service->users)
*
* @param service The current service
* @return -1 on failure, 0 for no users found, > 0 for found users
*/
static int
cdc_load_users(SERV_LISTENER *listener)
{
SERVICE *service = listener->service;
int loaded = -1;
char path[PATH_MAX + 1] = "";
/* File path for router cached authentication data */
snprintf(path, PATH_MAX, "%s/%s/cdcusers", get_datadir(), service->name);
/* Allocate users table */
if (listener->users == NULL)
{
listener->users = users_alloc();
}
/* Try loading authentication data from file cache */
loaded = cdc_read_users(listener->users, path);
if (loaded == -1)
{
MXS_ERROR("Service %s, Unable to read AVRO users information from %s."
" No AVRO user added to service users table. Service user is still allowed to connect.",
service->name,
path);
}
/* At service start last update is set to CDC_USERS_REFRESH_TIME seconds
* earlier. This way MaxScale could try reloading users just after startup.
*/
service->rate_limit.last = time(NULL) - CDC_USERS_REFRESH_TIME;
service->rate_limit.nloads = 1;
return loaded;
}
/** /**
* Load the AVRO users * Load the AVRO users
* *
@ -539,127 +475,52 @@ cdc_read_users(USERS *users, char *usersfile)
return loaded; return loaded;
} }
/** /**
* * Refresh the database users for the service * @brief Replace the user/passwd in the servicei users tbale from a db file
* * This function replaces the MySQL users used by the service with the latest
* * version found on the backend servers. There is a limit on how often the users
* * can be reloaded and if this limit is exceeded, the reload will fail.
* * @param service Service to reload
* * @return 0 on success and 1 on error
* */
static int
cdc_refresh_users(SERV_LISTENER *listener)
{
int ret = 1;
SERVICE *service = listener->service;
/* check for another running getUsers request */
if (!spinlock_acquire_nowait(&service->users_table_spin))
{
MXS_DEBUG("%s: [service_refresh_users] failed to get get lock for "
"loading new users' table: another thread is loading users",
service->name);
return 1;
}
/* check if refresh rate limit has exceeded */
if ((time(NULL) < (service->rate_limit.last + CDC_USERS_REFRESH_TIME)) ||
(service->rate_limit.nloads > CDC_USERS_REFRESH_MAX_PER_TIME))
{
spinlock_release(&service->users_table_spin);
MXS_ERROR("%s: Refresh rate limit exceeded for load of users' table.",
service->name);
return 1;
}
service->rate_limit.nloads++;
/* update time and counter */
if (service->rate_limit.nloads > CDC_USERS_REFRESH_MAX_PER_TIME)
{
service->rate_limit.nloads = 1;
service->rate_limit.last = time(NULL);
}
ret = cdc_replace_users(listener);
/* remove lock */
spinlock_release(&service->users_table_spin);
if (ret >= 0)
{
return 0;
}
else
{
return 1;
}
}
/* Replace the user/passwd in the servicei users tbale from a db file.
* The replacement is succesful only if the users' table checksums differ
* *
* @param service The current service * @param service The current service
* @return -1 on any error or the number of users inserted (0 means no users at all) */
* */ int cdc_replace_users(SERV_LISTENER *listener)
static int
cdc_replace_users(SERV_LISTENER *listener)
{ {
SERVICE *service = listener->service; int rc = AUTH_LOADUSERS_ERROR;
int i; USERS *newusers = users_alloc();
USERS *newusers, *oldusers;
char path[PATH_MAX + 1] = "";
/* File path for router cached authentication data */ if (newusers)
snprintf(path, PATH_MAX, "%s/%s/cdcusers", get_datadir(), service->name);
if ((newusers = users_alloc()) == NULL)
{ {
return -1; char path[PATH_MAX + 1];
snprintf(path, PATH_MAX, "%s/%s/cdcusers", get_datadir(), listener->service->name);
int i = cdc_read_users(newusers, path);
USERS *oldusers = NULL;
spinlock_acquire(&listener->lock);
if (i > 0)
{
/** Successfully loaded at least one user */
oldusers = listener->users;
listener->users = newusers;
rc = AUTH_LOADUSERS_OK;
}
else if (listener->users)
{
/** Failed to load users, use the old users table */
users_free(newusers);
}
else
{
/** No existing users, use the new empty users table */
listener->users = newusers;
}
cdc_set_service_user(listener);
spinlock_release(&listener->lock);
if (oldusers)
{
users_free(oldusers);
}
} }
return rc;
/* load users */
i = cdc_read_users(newusers, path);
if (i <= 0)
{
users_free(newusers);
return i;
}
spinlock_acquire(&listener->lock);
oldusers = listener->users;
/* digest compare */
if (oldusers != NULL && memcmp(oldusers->cksum, newusers->cksum,
SHA_DIGEST_LENGTH) == 0)
{
/* same data, nothing to do */
MXS_DEBUG("%lu [cdc_replace_users] users' tables not switched, checksum is the same",
pthread_self());
/* free the new table */
users_free(newusers);
i = 0;
}
else
{
/* replace the service with effective new data */
MXS_DEBUG("%lu [cdc_replace_users] users' tables replaced, checksum differs",
pthread_self());
listener->users = newusers;
}
spinlock_release(&listener->lock);
if (i && oldusers)
{
/* free the old table */
users_free(oldusers);
}
return i;
} }

View File

@ -12,7 +12,7 @@
*/ */
/** /**
* @file http_ba__auth.c * @file http_ba_auth.c
* *
* MaxScale HTTP Basic Access authentication for the HTTPD protocol module. * MaxScale HTTP Basic Access authentication for the HTTPD protocol module.
* *
@ -32,6 +32,7 @@
#include <openssl/bio.h> #include <openssl/bio.h>
#include <service.h> #include <service.h>
#include <secrets.h> #include <secrets.h>
#include <users.h>
/* @see function load_module in load_utils.c for explanation of the following /* @see function load_module in load_utils.c for explanation of the following
* lint directives. * lint directives.
@ -46,7 +47,7 @@ MODULE_INFO info =
}; };
/*lint +e14 */ /*lint +e14 */
static char *version_str = "V1.0.0"; static char *version_str = "V1.1.0";
static int http_auth_set_protocol_data(DCB *dcb, GWBUF *buf); static int http_auth_set_protocol_data(DCB *dcb, GWBUF *buf);
static bool http_auth_is_client_ssl_capable(DCB *dcb); static bool http_auth_is_client_ssl_capable(DCB *dcb);
@ -62,6 +63,7 @@ static GWAUTHENTICATOR MyObject =
http_auth_is_client_ssl_capable, /* Check if client supports SSL */ http_auth_is_client_ssl_capable, /* Check if client supports SSL */
http_auth_authenticate, /* Authenticate user credentials */ http_auth_authenticate, /* Authenticate user credentials */
http_auth_free_client_data, /* Free the client data held in DCB */ http_auth_free_client_data, /* Free the client data held in DCB */
users_default_loadusers
}; };
typedef struct http_auth typedef struct http_auth

View File

@ -32,6 +32,7 @@
#include <dcb.h> #include <dcb.h>
#include <buffer.h> #include <buffer.h>
#include <adminusers.h> #include <adminusers.h>
#include <users.h>
/* @see function load_module in load_utils.c for explanation of the following /* @see function load_module in load_utils.c for explanation of the following
* lint directives. * lint directives.
@ -46,7 +47,7 @@ MODULE_INFO info =
}; };
/*lint +e14 */ /*lint +e14 */
static char *version_str = "V2.0.0"; static char *version_str = "V2.1.0";
static int max_admin_auth_set_protocol_data(DCB *dcb, GWBUF *buf); static int max_admin_auth_set_protocol_data(DCB *dcb, GWBUF *buf);
static bool max_admin_auth_is_client_ssl_capable(DCB *dcb); static bool max_admin_auth_is_client_ssl_capable(DCB *dcb);
@ -62,6 +63,7 @@ static GWAUTHENTICATOR MyObject =
max_admin_auth_is_client_ssl_capable, /* Check if client supports SSL */ max_admin_auth_is_client_ssl_capable, /* Check if client supports SSL */
max_admin_auth_authenticate, /* Authenticate user credentials */ max_admin_auth_authenticate, /* Authenticate user credentials */
max_admin_auth_free_client_data, /* Free the client data held in DCB */ max_admin_auth_free_client_data, /* Free the client data held in DCB */
users_default_loadusers
}; };
/** /**

View File

@ -30,6 +30,11 @@
#include <gw_authenticator.h> #include <gw_authenticator.h>
#include <maxscale/alloc.h> #include <maxscale/alloc.h>
#include <maxscale/poll.h> #include <maxscale/poll.h>
#include <dbusers.h>
#include <gwdirs.h>
#include <gw.h>
#include <secrets.h>
#include <utils.h>
/* @see function load_module in load_utils.c for explanation of the following /* @see function load_module in load_utils.c for explanation of the following
* lint directives. * lint directives.
@ -44,12 +49,13 @@ MODULE_INFO info =
}; };
/*lint +e14 */ /*lint +e14 */
static char *version_str = "V1.0.0"; static char *version_str = "V1.1.0";
static int mysql_auth_set_protocol_data(DCB *dcb, GWBUF *buf); static int mysql_auth_set_protocol_data(DCB *dcb, GWBUF *buf);
static bool mysql_auth_is_client_ssl_capable(DCB *dcb); static bool mysql_auth_is_client_ssl_capable(DCB *dcb);
static int mysql_auth_authenticate(DCB *dcb); static int mysql_auth_authenticate(DCB *dcb);
static void mysql_auth_free_client_data(DCB *dcb); static void mysql_auth_free_client_data(DCB *dcb);
static int mysql_auth_load_users(SERV_LISTENER *port);
/* /*
* The "module object" for mysql client authenticator module. * The "module object" for mysql client authenticator module.
@ -60,6 +66,7 @@ static GWAUTHENTICATOR MyObject =
mysql_auth_is_client_ssl_capable, /* Check if client supports SSL */ mysql_auth_is_client_ssl_capable, /* Check if client supports SSL */
mysql_auth_authenticate, /* Authenticate user credentials */ mysql_auth_authenticate, /* Authenticate user credentials */
mysql_auth_free_client_data, /* Free the client data held in DCB */ mysql_auth_free_client_data, /* Free the client data held in DCB */
mysql_auth_load_users /* Load users from backend databases */
}; };
static int combined_auth_check( static int combined_auth_check(
@ -811,3 +818,63 @@ mysql_auth_free_client_data(DCB *dcb)
{ {
MXS_FREE(dcb->data); MXS_FREE(dcb->data);
} }
/**
* @brief Load MySQL authentication users
*
* This function loads MySQL users from the backend database.
*
* @param port Listener definition
* @return AUTH_LOADUSERS_OK on success, AUTH_LOADUSERS_ERROR on error
*/
static int mysql_auth_load_users(SERV_LISTENER *port)
{
int rc = AUTH_LOADUSERS_OK;
SERVICE *service = port->listener->service;
int loaded = replace_mysql_users(port);
if (loaded < 0)
{
MXS_ERROR("[%s] Unable to load users for listener %s listening at %s:%d.", service->name,
port->name, port->address ? port->address : "0.0.0.0", port->port);
/* Try loading authentication data from file cache */
char path[PATH_MAX];
sprintf(path, "%s/%s/%s/%s/%s", get_cachedir(), service->name, port->name,
DBUSERS_DIR, DBUSERS_FILE);
if ((loaded = dbusers_load(port->users, path)) == -1)
{
MXS_ERROR("[%s] Failed to load cached users from '%s'.", service->name, path);;
rc = AUTH_LOADUSERS_ERROR;
}
else
{
MXS_WARNING("Using cached credential information.");
}
}
else
{
/* Users loaded successfully, save authentication data to file cache */
char path[PATH_MAX];
sprintf(path, "%s/%s/%s/%s/", get_cachedir(), service->name, port->name, DBUSERS_DIR);
if (mxs_mkdir_all(path, 0777))
{
strcat(path, DBUSERS_FILE);
dbusers_save(port->users, path);
}
}
if (loaded == 0)
{
MXS_WARNING("[%s]: failed to load any user information. Authentication"
" will probably fail as a result.", service->name);
}
else if (loaded > 0)
{
MXS_NOTICE("[%s] Loaded %d MySQL users for listener %s.", service->name, loaded, port->name);
}
return rc;
}

View File

@ -31,6 +31,7 @@
#include <modinfo.h> #include <modinfo.h>
#include <dcb.h> #include <dcb.h>
#include <buffer.h> #include <buffer.h>
#include <users.h>
/* @see function load_module in load_utils.c for explanation of the following /* @see function load_module in load_utils.c for explanation of the following
* lint directives. * lint directives.
@ -45,7 +46,7 @@ MODULE_INFO info =
}; };
/*lint +e14 */ /*lint +e14 */
static char *version_str = "V1.0.0"; static char *version_str = "V1.1.0";
static int null_auth_set_protocol_data(DCB *dcb, GWBUF *buf); static int null_auth_set_protocol_data(DCB *dcb, GWBUF *buf);
static bool null_auth_is_client_ssl_capable(DCB *dcb); static bool null_auth_is_client_ssl_capable(DCB *dcb);
@ -61,6 +62,7 @@ static GWAUTHENTICATOR MyObject =
null_auth_is_client_ssl_capable, /* Check if client supports SSL */ null_auth_is_client_ssl_capable, /* Check if client supports SSL */
null_auth_authenticate, /* Authenticate user credentials */ null_auth_authenticate, /* Authenticate user credentials */
null_auth_free_client_data, /* Free the client data held in DCB */ null_auth_free_client_data, /* Free the client data held in DCB */
users_default_loadusers
}; };
/** /**

View File

@ -31,6 +31,7 @@
#include <modinfo.h> #include <modinfo.h>
#include <dcb.h> #include <dcb.h>
#include <buffer.h> #include <buffer.h>
#include <users.h>
/* @see function load_module in load_utils.c for explanation of the following /* @see function load_module in load_utils.c for explanation of the following
* lint directives. * lint directives.
@ -45,7 +46,7 @@ MODULE_INFO info =
}; };
/*lint +e14 */ /*lint +e14 */
static char *version_str = "V1.0.0"; static char *version_str = "V1.1.0";
static int null_auth_set_protocol_data(DCB *dcb, GWBUF *buf); static int null_auth_set_protocol_data(DCB *dcb, GWBUF *buf);
static bool null_auth_is_client_ssl_capable(DCB *dcb); static bool null_auth_is_client_ssl_capable(DCB *dcb);
@ -61,6 +62,7 @@ static GWAUTHENTICATOR MyObject =
null_auth_is_client_ssl_capable, /* Check if client supports SSL */ null_auth_is_client_ssl_capable, /* Check if client supports SSL */
null_auth_authenticate, /* Authenticate user credentials */ null_auth_authenticate, /* Authenticate user credentials */
null_auth_free_client_data, /* Free the client data held in DCB */ null_auth_free_client_data, /* Free the client data held in DCB */
users_default_loadusers
}; };
/** /**

View File

@ -35,6 +35,7 @@
* 22/07/16 Massimiliano Pinto Added Semi-Sync replication support * 22/07/16 Massimiliano Pinto Added Semi-Sync replication support
* 24/08/16 Massimiliano Pinto Added slave notification state CS_WAIT_DATA. * 24/08/16 Massimiliano Pinto Added slave notification state CS_WAIT_DATA.
* State CS_UPTODATE removed. * State CS_UPTODATE removed.
* 01/09/2016 Massimiliano Pinto Added support for ANNOTATE_ROWS_EVENT in COM_BINLOG_DUMP
* *
* @endverbatim * @endverbatim
*/ */
@ -124,6 +125,11 @@
#define LOG_EVENT_NO_FILTER_F 0x0100 #define LOG_EVENT_NO_FILTER_F 0x0100
#define LOG_EVENT_MTS_ISOLATE_F 0x0200 #define LOG_EVENT_MTS_ISOLATE_F 0x0200
/**
* Binlog COM_BINLOG_DUMP flags
*/
#define BLR_REQUEST_ANNOTATE_ROWS_EVENT 2
/** /**
* How often to call the binlog status function (seconds) * How often to call the binlog status function (seconds)
*/ */

View File

@ -1792,7 +1792,7 @@ static void rses_end_locked_router_action(ROUTER_SLAVE *rses)
static int getCapabilities() static int getCapabilities()
{ {
return (int)RCAP_TYPE_NO_RSESSION; return (int)(RCAP_TYPE_NO_RSESSION | RCAP_TYPE_NO_USERS_INIT);
} }
/** /**

View File

@ -49,6 +49,7 @@
* 26/04/2016 Massimiliano Pinto Added MariaDB 10.0 and 10.1 GTID event flags detection * 26/04/2016 Massimiliano Pinto Added MariaDB 10.0 and 10.1 GTID event flags detection
* 22/07/2016 Massimiliano Pinto Added semi_sync replication support * 22/07/2016 Massimiliano Pinto Added semi_sync replication support
* 24/08/2016 Massimiliano Pinto Added slave notification and blr_distribute_binlog_record removed * 24/08/2016 Massimiliano Pinto Added slave notification and blr_distribute_binlog_record removed
* 01/09/2016 Massimiliano Pinto Added support for ANNOTATE_ROWS_EVENT in COM_BINLOG_DUMP
* *
* @endverbatim * @endverbatim
*/ */
@ -948,7 +949,18 @@ blr_make_binlog_dump(ROUTER_INSTANCE *router)
data[4] = COM_BINLOG_DUMP; // Command data[4] = COM_BINLOG_DUMP; // Command
encode_value(&data[5], encode_value(&data[5],
router->current_pos, 32); // binlog position router->current_pos, 32); // binlog position
encode_value(&data[9], 0, 16); // Flags
/* With mariadb10 always ask for annotate rows events */
if (router->mariadb10_compat)
{
// set flag for annotate rows event
encode_value(&data[9], BLR_REQUEST_ANNOTATE_ROWS_EVENT, 16);
}
else
{
encode_value(&data[9], 0, 16); // No flag set
}
encode_value(&data[11], encode_value(&data[11],
router->serverid, 32); // Server-id of MaxScale router->serverid, 32); // Server-id of MaxScale
memcpy((char *)&data[15], router->binlog_name, memcpy((char *)&data[15], router->binlog_name,

View File

@ -1279,8 +1279,14 @@ clear_server(DCB *dcb, SERVER *server, char *bit)
static void static void
reload_dbusers(DCB *dcb, SERVICE *service) reload_dbusers(DCB *dcb, SERVICE *service)
{ {
dcb_printf(dcb, "Loaded %d database users for service %s.\n", if (service_refresh_users(service) == 0)
reload_mysql_users(service->ports), service->name); {
dcb_printf(dcb, "Reloaded database users for service %s.\n", service->name);
}
else
{
dcb_printf(dcb, "Error: Failed to reloaded database users for service %s.\n", service->name);
}
} }
/** /**