Merge branch 'develop' into MXS-1209

This commit is contained in:
MassimilianoPinto
2017-05-03 12:54:19 +02:00
15 changed files with 176 additions and 82 deletions

View File

@ -55,6 +55,12 @@ The Avrorouter now supports the `deflate` compression method. This allows the
stored Avro format files to be compressed on disk. For more information, refer stored Avro format files to be compressed on disk. For more information, refer
to the [Avrorouter](../Routers/Avrorouter.md) documentation. to the [Avrorouter](../Routers/Avrorouter.md) documentation.
### Preliminary proxy protocol support
The MySQL backend protocol module now supports sending a proxy protocol header
to the server. For more information, see the server section in the
[Configuration guide](../Getting-Started/Configuration-Guide.md).
## Bug fixes ## Bug fixes
[Here is a list of bugs fixed since the release of MaxScale 2.1.X.]() [Here is a list of bugs fixed since the release of MaxScale 2.1.X.]()

View File

@ -44,6 +44,7 @@ MXS_BEGIN_DECLS
* @return The value of variable before the add occurred * @return The value of variable before the add occurred
*/ */
int atomic_add(int *variable, int value); int atomic_add(int *variable, int value);
uint32_t atomic_add_uint32(uint32_t *variable, int32_t value);
int64_t atomic_add_int64(int64_t *variable, int64_t value); int64_t atomic_add_int64(int64_t *variable, int64_t value);
uint64_t atomic_add_uint64(uint64_t *variable, int64_t value); uint64_t atomic_add_uint64(uint64_t *variable, int64_t value);

View File

@ -313,7 +313,7 @@ int dcb_isvalid(DCB *); /* Check the DCB is in the linked li
int dcb_count_by_usage(DCB_USAGE); /* Return counts of DCBs */ int dcb_count_by_usage(DCB_USAGE); /* Return counts of DCBs */
int dcb_persistent_clean_count(DCB *, int, bool); /* Clean persistent and return count */ int dcb_persistent_clean_count(DCB *, int, bool); /* Clean persistent and return count */
void dcb_hangup_foreach (struct server* server); void dcb_hangup_foreach (struct server* server);
size_t dcb_get_session_id(DCB* dcb); uint32_t dcb_get_session_id(DCB* dcb);
char *dcb_role_name(DCB *); /* Return the name of a role */ char *dcb_role_name(DCB *); /* Return the name of a role */
int dcb_accept_SSL(DCB* dcb); int dcb_accept_SSL(DCB* dcb);
int dcb_connect_SSL(DCB* dcb); int dcb_connect_SSL(DCB* dcb);

View File

@ -320,7 +320,7 @@ typedef struct
uint32_t server_capabilities; /*< server capabilities, created or received */ uint32_t server_capabilities; /*< server capabilities, created or received */
uint32_t client_capabilities; /*< client capabilities, created or received */ uint32_t client_capabilities; /*< client capabilities, created or received */
uint32_t extra_capabilities; /*< MariaDB 10.2 capabilities */ uint32_t extra_capabilities; /*< MariaDB 10.2 capabilities */
unsigned long tid; /*< MySQL Thread ID, in handshake */ uint32_t tid; /*< MySQL Thread ID, in handshake */
unsigned int charset; /*< MySQL character set at connect time */ unsigned int charset; /*< MySQL character set at connect time */
int ignore_replies; /*< How many replies should be discarded */ int ignore_replies; /*< How many replies should be discarded */
GWBUF* stored_query; /*< Temporarily stored queries */ GWBUF* stored_query; /*< Temporarily stored queries */

View File

@ -133,7 +133,7 @@ typedef struct session
{ {
skygw_chk_t ses_chk_top; skygw_chk_t ses_chk_top;
mxs_session_state_t state; /*< Current descriptor state */ mxs_session_state_t state; /*< Current descriptor state */
size_t ses_id; /*< Unique session identifier */ uint32_t ses_id; /*< Unique session identifier */
struct dcb *client_dcb; /*< The client connection */ struct dcb *client_dcb; /*< The client connection */
struct mxs_router_session *router_session; /*< The router instance data */ struct mxs_router_session *router_session; /*< The router instance data */
MXS_SESSION_STATS stats; /*< Session statistics */ MXS_SESSION_STATS stats; /*< Session statistics */
@ -171,7 +171,30 @@ typedef struct session
((sess)->tail.clientReply)((sess)->tail.instance, \ ((sess)->tail.clientReply)((sess)->tail.instance, \
(sess)->tail.session, (buf)) (sess)->tail.session, (buf))
/**
* Allocate a new session for a new client of the specified service.
*
* Create the link to the router session by calling the newSession
* entry point of the router using the router instance of the
* service this session is part of.
*
* @param service The service this connection was established by
* @param client_dcb The client side DCB
* @return The newly created session or NULL if an error occurred
*/
MXS_SESSION *session_alloc(struct service *, struct dcb *); MXS_SESSION *session_alloc(struct service *, struct dcb *);
/**
* A version of session_alloc() which takes the session id number as parameter.
* The id should have been generated with session_get_next_id().
*
* @param service The service this connection was established by
* @param client_dcb The client side DCB
* @param id Id for the new session.
* @return The newly created session or NULL if an error occurred
*/
MXS_SESSION *session_alloc_with_id(struct service *, struct dcb *, uint32_t);
MXS_SESSION *session_set_dummy(struct dcb *); MXS_SESSION *session_set_dummy(struct dcb *);
const char *session_get_remote(const MXS_SESSION *); const char *session_get_remote(const MXS_SESSION *);
@ -328,7 +351,14 @@ static inline bool session_set_autocommit(MXS_SESSION* ses, bool autocommit)
* *
* @note The caller must free the session reference by calling session_put_ref * @note The caller must free the session reference by calling session_put_ref
*/ */
MXS_SESSION* session_get_by_id(int id); MXS_SESSION* session_get_by_id(uint32_t id);
/**
* Get the next available unique (assuming no overflow) session id number.
*
* @return An unused session id.
*/
uint32_t session_get_next_id();
/** /**
* @brief Close a session * @brief Close a session

View File

@ -116,4 +116,9 @@ bool mxs_worker_post_message(MXS_WORKER* worker, uint32_t msg_id, intptr_t arg1,
*/ */
size_t mxs_worker_broadcast_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2); size_t mxs_worker_broadcast_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2);
// These automatically act on the currently executing worker thread. Not implemented yet.
void mxs_add_to_session_map(uint32_t id, MXS_SESSION* session);
void mxs_remove_from_session_map(uint32_t id);
MXS_SESSION* mxs_find_in_session_map(uint32_t id);
MXS_END_DECLS MXS_END_DECLS

View File

@ -26,6 +26,15 @@ int atomic_add(int *variable, int value)
#endif #endif
} }
uint32_t atomic_add_uint32(uint32_t *variable, int32_t value)
{
#ifdef MXS_USE_ATOMIC_BUILTINS
return __atomic_fetch_add(variable, value, __ATOMIC_SEQ_CST);
#else
return __sync_fetch_and_add(variable, value);
#endif
}
int64_t atomic_add_int64(int64_t *variable, int64_t value) int64_t atomic_add_int64(int64_t *variable, int64_t value)
{ {
#ifdef MXS_USE_ATOMIC_BUILTINS #ifdef MXS_USE_ATOMIC_BUILTINS

View File

@ -24,6 +24,7 @@
#include <arpa/inet.h> #include <arpa/inet.h>
#include <errno.h> #include <errno.h>
#include <inttypes.h>
#include <netinet/tcp.h> #include <netinet/tcp.h>
#include <signal.h> #include <signal.h>
#include <stdarg.h> #include <stdarg.h>
@ -162,8 +163,7 @@ static uint32_t dcb_process_poll_events(DCB *dcb, int thread_id, uint32_t ev);
static void dcb_process_fake_events(DCB *dcb, int thread_id); static void dcb_process_fake_events(DCB *dcb, int thread_id);
static bool dcb_session_check(DCB *dcb, const char *); static bool dcb_session_check(DCB *dcb, const char *);
size_t dcb_get_session_id( uint32_t dcb_get_session_id(DCB *dcb)
DCB *dcb)
{ {
return (dcb && dcb->session) ? dcb->session->ses_id : 0; return (dcb && dcb->session) ? dcb->session->ses_id : 0;
} }
@ -1673,7 +1673,7 @@ dprintDCB(DCB *pdcb, DCB *dcb)
if (dcb->session && dcb->session->state != SESSION_STATE_DUMMY) if (dcb->session && dcb->session->state != SESSION_STATE_DUMMY)
{ {
dcb_printf(pdcb, "\tOwning Session: %lu\n", dcb->session->ses_id); dcb_printf(pdcb, "\tOwning Session: %" PRIu32 "\n", dcb->session->ses_id);
} }
if (dcb->writeq) if (dcb->writeq)
@ -3069,7 +3069,7 @@ private:
bool dcb_foreach(bool(*func)(DCB *dcb, void *data), void *data) bool dcb_foreach(bool(*func)(DCB *dcb, void *data), void *data)
{ {
SerialDcbTask task(func, data); SerialDcbTask task(func, data);
Worker::execute_on_all_serially(&task); Worker::execute_serially(task);
return task.more(); return task.more();
} }
@ -3104,9 +3104,8 @@ private:
void dcb_foreach_parallel(bool(*func)(DCB *dcb, void *data), void **data) void dcb_foreach_parallel(bool(*func)(DCB *dcb, void *data), void **data)
{ {
Semaphore sem;
ParallelDcbTask task(func, data); ParallelDcbTask task(func, data);
sem.wait_n(Worker::execute_on_all(&task, &sem)); Worker::execute_concurrently(task);
} }
int dcb_get_port(const DCB *dcb) int dcb_get_port(const DCB *dcb)

View File

@ -14,7 +14,9 @@
#include <maxscale/cppdefs.hh> #include <maxscale/cppdefs.hh>
#include <memory> #include <memory>
#include <tr1/unordered_map>
#include <maxscale/platform.h> #include <maxscale/platform.h>
#include <maxscale/session.h>
#include "messagequeue.hh" #include "messagequeue.hh"
#include "poll.h" #include "poll.h"
#include "worker.h" #include "worker.h"
@ -67,6 +69,7 @@ public:
typedef WORKER_STATISTICS STATISTICS; typedef WORKER_STATISTICS STATISTICS;
typedef WorkerTask Task; typedef WorkerTask Task;
typedef WorkerDisposableTask DisposableTask; typedef WorkerDisposableTask DisposableTask;
typedef std::tr1::unordered_map<uint32_t, MXS_SESSION*> SessionsById;
enum state_t enum state_t
{ {
@ -251,12 +254,12 @@ public:
} }
/** /**
* Executes a task in the context of a Worker. * Posts a task to a worker for execution.
* *
* @param pTask The task to be executed. * @param pTask The task to be executed.
* @param pSem If non-NULL, will be posted once the task's `execute` return. * @param pSem If non-NULL, will be posted once the task's `execute` return.
* *
* @return True if the task could be *posted*, false otherwise. * @return True if the task could be posted (i.e. not executed), false otherwise.
* *
* @attention The instance must remain valid for as long as it takes for the * @attention The instance must remain valid for as long as it takes for the
* task to be transferred to the worker and its `execute` function * task to be transferred to the worker and its `execute` function
@ -274,21 +277,21 @@ public:
* MyResult& result = task.result(); * MyResult& result = task.result();
* @endcode * @endcode
*/ */
bool execute(Task* pTask, Semaphore* pSem = NULL); bool post(Task* pTask, Semaphore* pSem = NULL);
/** /**
* Executes a disposable task in the context of a Worker. * Posts a task to a worker for execution.
* *
* @param pTask The task to be executed. * @param pTask The task to be executed.
* *
* @return True if the task could be *posted*, false otherwise. * @return True if the task could be posted (i.e. not executed), false otherwise.
* *
* @attention Once the task has been executed, it will be deleted. * @attention Once the task has been executed, it will be deleted.
*/ */
bool execute(std::auto_ptr<DisposableTask> sTask); bool post(std::auto_ptr<DisposableTask> sTask);
/** /**
* Executes a task on all workers. * Posts a task to all workers for execution.
* *
* @param pTask The task to be executed. * @param pTask The task to be executed.
* @param pSem If non-NULL, will be posted once per worker when the task's * @param pSem If non-NULL, will be posted once per worker when the task's
@ -301,10 +304,10 @@ public:
* have data specific to each worker that can be accessed * have data specific to each worker that can be accessed
* without locks. * without locks.
*/ */
static size_t execute_on_all(Task* pTask, Semaphore* pSem = NULL); static size_t broadcast(Task* pTask, Semaphore* pSem = NULL);
/** /**
* Executes a task on all workers. * Posts a task to all workers for execution.
* *
* @param pTask The task to be executed. * @param pTask The task to be executed.
* *
@ -318,14 +321,14 @@ public:
* @attention Once the task has been executed by all workers, it will * @attention Once the task has been executed by all workers, it will
* be deleted. * be deleted.
*/ */
static size_t execute_on_all(std::auto_ptr<DisposableTask> sTask); static size_t broadcast(std::auto_ptr<DisposableTask> sTask);
/** /**
* Executes a task on all workers in serial mode. * Executes a task on all workers in serial mode (the task is executed
* on at most one worker thread at a time). When the function returns
* the task has been executed on all workers.
* *
* The task is executed on at most one worker thread at a time. * @param task The task to be executed.
*
* @param pTask The task to be executed.
* *
* @return How many workers the task was posted to. * @return How many workers the task was posted to.
* *
@ -333,7 +336,18 @@ public:
* to the other functions. Only use this function when printing thread-specific * to the other functions. Only use this function when printing thread-specific
* data to stdout. * data to stdout.
*/ */
static size_t execute_on_all_serially(Task* pTask); static size_t execute_serially(Task& task);
/**
* Executes a task on all workers concurrently and waits until all workers
* are done. That is, when the function returns the task has been executed
* by all workers.
*
* @param task The task to be executed.
*
* @return How many workers the task was posted to.
*/
static size_t execute_concurrently(Task& task);
/** /**
* Post a message to a worker. * Post a message to a worker.
@ -428,7 +442,7 @@ private:
static Worker* create(int id, int epoll_listener_fd); static Worker* create(int id, int epoll_listener_fd);
bool execute_disposable(DisposableTask* pTask); bool post_disposable(DisposableTask* pTask);
void handle_message(MessageQueue& queue, const MessageQueue::Message& msg); // override void handle_message(MessageQueue& queue, const MessageQueue::Message& msg); // override
@ -449,6 +463,11 @@ private:
bool m_started; /*< Whether the thread has been started or not. */ bool m_started; /*< Whether the thread has been started or not. */
bool m_should_shutdown; /*< Whether shutdown should be performed. */ bool m_should_shutdown; /*< Whether shutdown should be performed. */
bool m_shutdown_initiated; /*< Whether shutdown has been initated. */ bool m_shutdown_initiated; /*< Whether shutdown has been initated. */
SessionsById m_sessions; /*< A mapping of session_id->MXS_SESSION. The map
* should contain sessions exclusive to this
* worker and not e.g. listener sessions. For now,
* it's up to the protocol to decide whether a new
* session is added to the map. */
}; };
} }

View File

@ -297,7 +297,7 @@ static bool process_argument(const MODULECMD *cmd, modulecmd_arg_type_t *type, c
break; break;
case MODULECMD_ARG_SESSION: case MODULECMD_ARG_SESSION:
if ((arg->value.session = session_get_by_id(atoi((const char*)value)))) if ((arg->value.session = session_get_by_id(strtoul((const char*)value, NULL, 0))))
{ {
arg->type.type = MODULECMD_ARG_SESSION; arg->type.type = MODULECMD_ARG_SESSION;
} }

View File

@ -29,6 +29,7 @@
*/ */
#include <maxscale/session.h> #include <maxscale/session.h>
#include <inttypes.h>
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <unistd.h> #include <unistd.h>
@ -48,8 +49,10 @@
#include "maxscale/session.h" #include "maxscale/session.h"
#include "maxscale/filter.h" #include "maxscale/filter.h"
/** Global session id; updated safely by use of atomic_add */ /** Global session id counter. Must be updated atomically. Value 0 is reserved for
static int session_id; * dummy/unused sessions.
*/
static uint32_t next_session_id = 1;
static struct session session_dummy_struct; static struct session session_dummy_struct;
@ -59,7 +62,8 @@ static void session_simple_free(MXS_SESSION *session, DCB *dcb);
static void session_add_to_all_list(MXS_SESSION *session); static void session_add_to_all_list(MXS_SESSION *session);
static MXS_SESSION *session_find_free(); static MXS_SESSION *session_find_free();
static void session_final_free(MXS_SESSION *session); static void session_final_free(MXS_SESSION *session);
static MXS_SESSION* session_alloc_body(SERVICE* service, DCB* client_dcb,
MXS_SESSION* session);
/** /**
* The clientReply of the session. * The clientReply of the session.
* *
@ -87,30 +91,35 @@ session_initialize(MXS_SESSION *session)
session->ses_chk_tail = CHK_NUM_SESSION; session->ses_chk_tail = CHK_NUM_SESSION;
} }
/** MXS_SESSION* session_alloc(SERVICE *service, DCB *client_dcb)
* Allocate a new session for a new client of the specified service.
*
* Create the link to the router session by calling the newSession
* entry point of the router using the router instance of the
* service this session is part of.
*
* @param service The service this connection was established by
* @param client_dcb The client side DCB
* @return The newly created session or NULL if an error occured
*/
MXS_SESSION *
session_alloc(SERVICE *service, DCB *client_dcb)
{ {
MXS_SESSION *session = (MXS_SESSION *)(MXS_MALLOC(sizeof(*session))); MXS_SESSION *session = (MXS_SESSION *)(MXS_MALLOC(sizeof(*session)));
if (NULL == session) if (NULL == session)
{ {
return NULL; return NULL;
} }
session_initialize(session);
/** Assign a session id and increase */ session_initialize(session);
session->ses_id = (size_t)atomic_add(&session_id, 1) + 1; session->ses_id = session_get_next_id();
return session_alloc_body(service, client_dcb, session);
}
MXS_SESSION* session_alloc_with_id(SERVICE *service, DCB *client_dcb, uint32_t id)
{
MXS_SESSION *session = (MXS_SESSION *)(MXS_MALLOC(sizeof(*session)));
if (session == NULL)
{
return NULL;
}
session_initialize(session);
session->ses_id = id;
return session_alloc_body(service, client_dcb, session);
}
static MXS_SESSION* session_alloc_body(SERVICE* service, DCB* client_dcb,
MXS_SESSION* session)
{
session->ses_is_child = (bool) DCB_IS_CLONE(client_dcb); session->ses_is_child = (bool) DCB_IS_CLONE(client_dcb);
session->service = service; session->service = service;
session->client_dcb = client_dcb; session->client_dcb = client_dcb;
@ -134,14 +143,13 @@ session_alloc(SERVICE *service, DCB *client_dcb)
session->trx_state = SESSION_TRX_INACTIVE; session->trx_state = SESSION_TRX_INACTIVE;
session->autocommit = true; session->autocommit = true;
/* /*
* Only create a router session if we are not the listening * Only create a router session if we are not the listening DCB or an
* DCB or an internal DCB. Creating a router session may create a connection to a * internal DCB. Creating a router session may create a connection to
* backend server, depending upon the router module implementation * a backend server, depending upon the router module implementation
* and should be avoided for the listener session * and should be avoided for a listener session.
* *
* Router session creation may create other DCBs that link to the * Router session creation may create other DCBs that link to the
* session, therefore it is important that the session lock is * session.
* relinquished before the router call.
*/ */
if (client_dcb->state != DCB_STATE_LISTENING && if (client_dcb->state != DCB_STATE_LISTENING &&
client_dcb->dcb_role != DCB_ROLE_INTERNAL) client_dcb->dcb_role != DCB_ROLE_INTERNAL)
@ -163,7 +171,7 @@ session_alloc(SERVICE *service, DCB *client_dcb)
* protocol end of the chain. * protocol end of the chain.
*/ */
// NOTE: Here we cast the router instance into a MXS_FILTER and // NOTE: Here we cast the router instance into a MXS_FILTER and
// NOTE: and the router session into a MXS_FILTER_SESSION and // NOTE: the router session into a MXS_FILTER_SESSION and
// NOTE: the router routeQuery into a filter routeQuery. That // NOTE: the router routeQuery into a filter routeQuery. That
// NOTE: is in order to be able to treat the router as the first // NOTE: is in order to be able to treat the router as the first
// NOTE: filter. // NOTE: filter.
@ -197,13 +205,13 @@ session_alloc(SERVICE *service, DCB *client_dcb)
if (session->client_dcb->user == NULL) if (session->client_dcb->user == NULL)
{ {
MXS_INFO("Started session [%lu] for %s service ", MXS_INFO("Started session [%" PRIu32 "] for %s service ",
session->ses_id, session->ses_id,
service->name); service->name);
} }
else else
{ {
MXS_INFO("Started %s client session [%lu] for '%s' from %s", MXS_INFO("Started %s client session [%" PRIu32 "] for '%s' from %s",
service->name, service->name,
session->ses_id, session->ses_id,
session->client_dcb->user, session->client_dcb->user,
@ -212,7 +220,7 @@ session_alloc(SERVICE *service, DCB *client_dcb)
} }
else else
{ {
MXS_INFO("Start %s client session [%lu] for '%s' from %s failed, will be " MXS_INFO("Start %s client session [%" PRIu32 "] for '%s' from %s failed, will be "
"closed as soon as all related DCBs have been closed.", "closed as soon as all related DCBs have been closed.",
service->name, service->name,
session->ses_id, session->ses_id,
@ -224,7 +232,7 @@ session_alloc(SERVICE *service, DCB *client_dcb)
CHK_SESSION(session); CHK_SESSION(session);
client_dcb->session = session; client_dcb->session = session;
return SESSION_STATE_TO_BE_FREED == session->state ? NULL : session; return (session->state == SESSION_STATE_TO_BE_FREED) ? NULL : session;
} }
/** /**
@ -382,7 +390,7 @@ static void session_free(MXS_SESSION *session)
MXS_FREE(session->filters); MXS_FREE(session->filters);
} }
MXS_INFO("Stopped %s client session [%lu]", session->service->name, session->ses_id); MXS_INFO("Stopped %s client session [%" PRIu32 "]", session->service->name, session->ses_id);
/** If session doesn't have parent referencing to it, it can be freed */ /** If session doesn't have parent referencing to it, it can be freed */
if (!session->ses_is_child) if (!session->ses_is_child)
@ -494,7 +502,7 @@ dprintSession(DCB *dcb, MXS_SESSION *print_session)
char buf[30]; char buf[30];
int i; int i;
dcb_printf(dcb, "Session %lu\n", print_session->ses_id); dcb_printf(dcb, "Session %" PRIu32 "\n", print_session->ses_id);
dcb_printf(dcb, "\tState: %s\n", session_state(print_session->state)); dcb_printf(dcb, "\tState: %s\n", session_state(print_session->state));
dcb_printf(dcb, "\tService: %s\n", print_session->service->name); dcb_printf(dcb, "\tService: %s\n", print_session->service->name);
@ -534,7 +542,7 @@ bool dListSessions_cb(DCB *dcb, void *data)
{ {
DCB *out_dcb = (DCB*)data; DCB *out_dcb = (DCB*)data;
MXS_SESSION *session = dcb->session; MXS_SESSION *session = dcb->session;
dcb_printf(out_dcb, "%-16lu | %-15s | %-14s | %s\n", session->ses_id, dcb_printf(out_dcb, "%-16" PRIu32 " | %-15s | %-14s | %s\n", session->ses_id,
session->client_dcb && session->client_dcb->remote ? session->client_dcb && session->client_dcb->remote ?
session->client_dcb->remote : "", session->client_dcb->remote : "",
session->service && session->service->name ? session->service && session->service->name ?
@ -898,7 +906,7 @@ static bool ses_find_id(DCB *dcb, void *data)
{ {
void **params = (void**)data; void **params = (void**)data;
MXS_SESSION **ses = (MXS_SESSION**)params[0]; MXS_SESSION **ses = (MXS_SESSION**)params[0];
size_t *id = (size_t*)params[1]; uint32_t *id = (uint32_t*)params[1];
bool rval = true; bool rval = true;
if (dcb->session->ses_id == *id) if (dcb->session->ses_id == *id)
@ -910,7 +918,7 @@ static bool ses_find_id(DCB *dcb, void *data)
return rval; return rval;
} }
MXS_SESSION* session_get_by_id(int id) MXS_SESSION* session_get_by_id(uint32_t id)
{ {
MXS_SESSION *session = NULL; MXS_SESSION *session = NULL;
void *params[] = {&session, &id}; void *params[] = {&session, &id};
@ -981,3 +989,8 @@ void session_clear_stmt(MXS_SESSION *session)
session->stmt.buffer = NULL; session->stmt.buffer = NULL;
session->stmt.target = NULL; session->stmt.target = NULL;
} }
uint32_t session_get_next_id()
{
return atomic_add_uint32(&next_session_id, 1);
}

View File

@ -548,21 +548,23 @@ void Worker::set_maxwait(unsigned int maxwait)
this_unit.max_poll_sleep = maxwait; this_unit.max_poll_sleep = maxwait;
} }
bool Worker::execute(Task* pTask, Semaphore* pSem) bool Worker::post(Task* pTask, Semaphore* pSem)
{ {
// No logging here, function must be signal safe.
intptr_t arg1 = reinterpret_cast<intptr_t>(pTask); intptr_t arg1 = reinterpret_cast<intptr_t>(pTask);
intptr_t arg2 = reinterpret_cast<intptr_t>(pSem); intptr_t arg2 = reinterpret_cast<intptr_t>(pSem);
return post_message(MXS_WORKER_MSG_TASK, arg1, arg2); return post_message(MXS_WORKER_MSG_TASK, arg1, arg2);
} }
bool Worker::execute(std::auto_ptr<DisposableTask> sTask) bool Worker::post(std::auto_ptr<DisposableTask> sTask)
{ {
return execute_disposable(sTask.release()); // No logging here, function must be signal safe.
return post_disposable(sTask.release());
} }
// private // private
bool Worker::execute_disposable(DisposableTask* pTask) bool Worker::post_disposable(DisposableTask* pTask)
{ {
pTask->inc_ref(); pTask->inc_ref();
@ -579,15 +581,16 @@ bool Worker::execute_disposable(DisposableTask* pTask)
} }
//static //static
size_t Worker::execute_on_all(Task* pTask, Semaphore* pSem) size_t Worker::broadcast(Task* pTask, Semaphore* pSem)
{ {
// No logging here, function must be signal safe.
size_t n = 0; size_t n = 0;
for (int i = 0; i < this_unit.n_workers; ++i) for (int i = 0; i < this_unit.n_workers; ++i)
{ {
Worker* pWorker = this_unit.ppWorkers[i]; Worker* pWorker = this_unit.ppWorkers[i];
if (pWorker->execute(pTask, pSem)) if (pWorker->post(pTask, pSem))
{ {
++n; ++n;
} }
@ -597,7 +600,7 @@ size_t Worker::execute_on_all(Task* pTask, Semaphore* pSem)
} }
//static //static
size_t Worker::execute_on_all(std::auto_ptr<DisposableTask> sTask) size_t Worker::broadcast(std::auto_ptr<DisposableTask> sTask)
{ {
DisposableTask* pTask = sTask.release(); DisposableTask* pTask = sTask.release();
pTask->inc_ref(); pTask->inc_ref();
@ -608,7 +611,7 @@ size_t Worker::execute_on_all(std::auto_ptr<DisposableTask> sTask)
{ {
Worker* pWorker = this_unit.ppWorkers[i]; Worker* pWorker = this_unit.ppWorkers[i];
if (pWorker->execute_disposable(pTask)) if (pWorker->post_disposable(pTask))
{ {
++n; ++n;
} }
@ -618,8 +621,9 @@ size_t Worker::execute_on_all(std::auto_ptr<DisposableTask> sTask)
return n; return n;
} }
//static //static
size_t Worker::execute_on_all_serially(Task* pTask) size_t Worker::execute_serially(Task& task)
{ {
Semaphore sem; Semaphore sem;
size_t n = 0; size_t n = 0;
@ -628,7 +632,7 @@ size_t Worker::execute_on_all_serially(Task* pTask)
{ {
Worker* pWorker = this_unit.ppWorkers[i]; Worker* pWorker = this_unit.ppWorkers[i];
if (pWorker->execute(pTask, &sem)) if (pWorker->post(&task, &sem))
{ {
sem.wait(); sem.wait();
++n; ++n;
@ -638,6 +642,13 @@ size_t Worker::execute_on_all_serially(Task* pTask)
return n; return n;
} }
//static
size_t Worker::execute_concurrently(Task& task)
{
Semaphore sem;
return sem.wait_n(Worker::broadcast(&task, &sem));
}
bool Worker::post_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2) bool Worker::post_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2)
{ {
// NOTE: No logging here, this function must be signal safe. // NOTE: No logging here, this function must be signal safe.

View File

@ -229,7 +229,6 @@ int MySQLSendHandshake(DCB* dcb)
char server_scramble[GW_MYSQL_SCRAMBLE_SIZE + 1] = ""; char server_scramble[GW_MYSQL_SCRAMBLE_SIZE + 1] = "";
char *version_string; char *version_string;
int len_version_string = 0; int len_version_string = 0;
int id_num;
bool is_maria = false; bool is_maria = false;
@ -275,10 +274,9 @@ int MySQLSendHandshake(DCB* dcb)
memcpy(mysql_filler_ten + 6, &new_flags, sizeof(new_flags)); memcpy(mysql_filler_ten + 6, &new_flags, sizeof(new_flags));
} }
// thread id, now put thePID // Get the equivalent of the server process id.
id_num = getpid() + dcb->fd; protocol->tid = session_get_next_id();
gw_mysql_set_byte4(mysql_thread_id_num, id_num); gw_mysql_set_byte4(mysql_thread_id_num, protocol->tid);
memcpy(mysql_scramble_buf, server_scramble, 8); memcpy(mysql_scramble_buf, server_scramble, 8);
memcpy(mysql_plugin_data, server_scramble + 8, 12); memcpy(mysql_plugin_data, server_scramble + 8, 12);
@ -702,7 +700,8 @@ gw_read_do_authentication(DCB *dcb, GWBUF *read_buffer, int nbytes_read)
* is changed so that future data will go through the * is changed so that future data will go through the
* normal data handling function instead of this one. * normal data handling function instead of this one.
*/ */
MXS_SESSION *session = session_alloc(dcb->service, dcb); MXS_SESSION *session =
session_alloc_with_id(dcb->service, dcb, protocol->tid);
if (session != NULL) if (session != NULL)
{ {

View File

@ -1724,7 +1724,7 @@ convert_arg(char *arg, int arg_type)
break; break;
case ARG_TYPE_SESSION: case ARG_TYPE_SESSION:
rval = (unsigned long)session_get_by_id(strtol(arg, NULL, 0)); rval = (unsigned long)session_get_by_id(strtoul(arg, NULL, 0));
break; break;
case ARG_TYPE_MONITOR: case ARG_TYPE_MONITOR:

View File

@ -13,6 +13,8 @@
#include "schemarouter.hh" #include "schemarouter.hh"
#include <inttypes.h>
#include <maxscale/alloc.h> #include <maxscale/alloc.h>
#include <maxscale/query_classifier.h> #include <maxscale/query_classifier.h>
#include <maxscale/modutil.h> #include <maxscale/modutil.h>
@ -361,7 +363,7 @@ int32_t SchemaRouterSession::routeQuery(GWBUF* pPacket)
if (m_config->debug) if (m_config->debug)
{ {
sprintf(errbuf + strlen(errbuf), sprintf(errbuf + strlen(errbuf),
" ([%lu]: DB change failed)", " ([%" PRIu32 "]: DB change failed)",
m_client->session->ses_id); m_client->session->ses_id);
} }
@ -989,7 +991,7 @@ bool SchemaRouterSession::handle_default_db()
sprintf(errmsg, "Unknown database '%s'", m_connect_db.c_str()); sprintf(errmsg, "Unknown database '%s'", m_connect_db.c_str());
if (m_config->debug) if (m_config->debug)
{ {
sprintf(errmsg + strlen(errmsg), " ([%lu]: DB not found on connect)", sprintf(errmsg + strlen(errmsg), " ([%" PRIu32 "]: DB not found on connect)",
m_client->session->ses_id); m_client->session->ses_id);
} }
write_error_to_client(m_client, write_error_to_client(m_client,