KILL [CONNECTION | QUERY] support, part1

Preparation for adding KILL syntax support.
Session id changed to uint32 everywhere. Added atomic op.
Session id can be acquired before session_alloc().
Added session_alloc_with_id(), which is given a session id number.
Worker object has a session_id->SESSION* mapping, not used yet.
This commit is contained in:
Esa Korhonen
2017-04-27 18:09:06 +03:00
parent 46da2d3ad2
commit bfd94c2b31
13 changed files with 120 additions and 53 deletions

View File

@ -44,6 +44,7 @@ MXS_BEGIN_DECLS
* @return The value of variable before the add occurred
*/
int atomic_add(int *variable, int value);
uint32_t atomic_add_uint32(uint32_t *variable, int32_t value);
int64_t atomic_add_int64(int64_t *variable, int64_t value);
uint64_t atomic_add_uint64(uint64_t *variable, int64_t value);

View File

@ -313,7 +313,7 @@ int dcb_isvalid(DCB *); /* Check the DCB is in the linked li
int dcb_count_by_usage(DCB_USAGE); /* Return counts of DCBs */
int dcb_persistent_clean_count(DCB *, int, bool); /* Clean persistent and return count */
void dcb_hangup_foreach (struct server* server);
size_t dcb_get_session_id(DCB* dcb);
uint32_t dcb_get_session_id(DCB* dcb);
char *dcb_role_name(DCB *); /* Return the name of a role */
int dcb_accept_SSL(DCB* dcb);
int dcb_connect_SSL(DCB* dcb);

View File

@ -320,7 +320,7 @@ typedef struct
uint32_t server_capabilities; /*< server capabilities, created or received */
uint32_t client_capabilities; /*< client capabilities, created or received */
uint32_t extra_capabilities; /*< MariaDB 10.2 capabilities */
unsigned long tid; /*< MySQL Thread ID, in handshake */
uint32_t tid; /*< MySQL Thread ID, in handshake */
unsigned int charset; /*< MySQL character set at connect time */
int ignore_replies; /*< How many replies should be discarded */
GWBUF* stored_query; /*< Temporarily stored queries */

View File

@ -133,7 +133,7 @@ typedef struct session
{
skygw_chk_t ses_chk_top;
mxs_session_state_t state; /*< Current descriptor state */
size_t ses_id; /*< Unique session identifier */
uint32_t ses_id; /*< Unique session identifier */
struct dcb *client_dcb; /*< The client connection */
struct mxs_router_session *router_session; /*< The router instance data */
MXS_SESSION_STATS stats; /*< Session statistics */
@ -171,7 +171,30 @@ typedef struct session
((sess)->tail.clientReply)((sess)->tail.instance, \
(sess)->tail.session, (buf))
/**
* Allocate a new session for a new client of the specified service.
*
* Create the link to the router session by calling the newSession
* entry point of the router using the router instance of the
* service this session is part of.
*
* @param service The service this connection was established by
* @param client_dcb The client side DCB
* @return The newly created session or NULL if an error occurred
*/
MXS_SESSION *session_alloc(struct service *, struct dcb *);
/**
* A version of session_alloc() which takes the session id number as parameter.
* The id should have been generated with session_get_next_id().
*
* @param service The service this connection was established by
* @param client_dcb The client side DCB
* @param id Id for the new session.
* @return The newly created session or NULL if an error occurred
*/
MXS_SESSION *session_alloc_with_id(struct service *, struct dcb *, uint32_t);
MXS_SESSION *session_set_dummy(struct dcb *);
const char *session_get_remote(const MXS_SESSION *);
@ -328,7 +351,14 @@ static inline bool session_set_autocommit(MXS_SESSION* ses, bool autocommit)
*
* @note The caller must free the session reference by calling session_put_ref
*/
MXS_SESSION* session_get_by_id(int id);
MXS_SESSION* session_get_by_id(uint32_t id);
/**
* Get the next available unique (assuming no overflow) session id number.
*
* @return An unused session id.
*/
uint32_t session_get_next_id();
/**
* @brief Close a session

View File

@ -116,4 +116,9 @@ bool mxs_worker_post_message(MXS_WORKER* worker, uint32_t msg_id, intptr_t arg1,
*/
size_t mxs_worker_broadcast_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2);
// These automatically act on the currently executing worker thread. Not implemented yet.
void mxs_add_to_session_map(uint32_t id, MXS_SESSION* session);
void mxs_remove_from_session_map(uint32_t id);
MXS_SESSION* mxs_find_in_session_map(uint32_t id);
MXS_END_DECLS

View File

@ -26,6 +26,15 @@ int atomic_add(int *variable, int value)
#endif
}
uint32_t atomic_add_uint32(uint32_t *variable, int32_t value)
{
#ifdef MXS_USE_ATOMIC_BUILTINS
return __atomic_fetch_add(variable, value, __ATOMIC_SEQ_CST);
#else
return __sync_fetch_and_add(variable, value);
#endif
}
int64_t atomic_add_int64(int64_t *variable, int64_t value)
{
#ifdef MXS_USE_ATOMIC_BUILTINS

View File

@ -24,6 +24,7 @@
#include <arpa/inet.h>
#include <errno.h>
#include <inttypes.h>
#include <netinet/tcp.h>
#include <signal.h>
#include <stdarg.h>
@ -162,8 +163,7 @@ static uint32_t dcb_process_poll_events(DCB *dcb, int thread_id, uint32_t ev);
static void dcb_process_fake_events(DCB *dcb, int thread_id);
static bool dcb_session_check(DCB *dcb, const char *);
size_t dcb_get_session_id(
DCB *dcb)
uint32_t dcb_get_session_id(DCB *dcb)
{
return (dcb && dcb->session) ? dcb->session->ses_id : 0;
}
@ -1673,7 +1673,7 @@ dprintDCB(DCB *pdcb, DCB *dcb)
if (dcb->session && dcb->session->state != SESSION_STATE_DUMMY)
{
dcb_printf(pdcb, "\tOwning Session: %lu\n", dcb->session->ses_id);
dcb_printf(pdcb, "\tOwning Session: %" PRIu32 "\n", dcb->session->ses_id);
}
if (dcb->writeq)

View File

@ -14,7 +14,9 @@
#include <maxscale/cppdefs.hh>
#include <memory>
#include <tr1/unordered_map>
#include <maxscale/platform.h>
#include <maxscale/session.h>
#include "messagequeue.hh"
#include "poll.h"
#include "worker.h"
@ -67,6 +69,7 @@ public:
typedef WORKER_STATISTICS STATISTICS;
typedef WorkerTask Task;
typedef WorkerDisposableTask DisposableTask;
typedef std::tr1::unordered_map<uint32_t, MXS_SESSION*> SessionsById;
enum state_t
{
@ -449,6 +452,11 @@ private:
bool m_started; /*< Whether the thread has been started or not. */
bool m_should_shutdown; /*< Whether shutdown should be performed. */
bool m_shutdown_initiated; /*< Whether shutdown has been initated. */
SessionsById m_sessions; /*< A mapping of session_id->MXS_SESSION. The map
* should contain sessions exclusive to this
* worker and not e.g. listener sessions. For now,
* it's up to the protocol to decide whether a new
* session is added to the map. */
};
}

View File

@ -297,7 +297,7 @@ static bool process_argument(const MODULECMD *cmd, modulecmd_arg_type_t *type, c
break;
case MODULECMD_ARG_SESSION:
if ((arg->value.session = session_get_by_id(atoi((const char*)value))))
if ((arg->value.session = session_get_by_id(strtoul((const char*)value, NULL, 0))))
{
arg->type.type = MODULECMD_ARG_SESSION;
}

View File

@ -29,6 +29,7 @@
*/
#include <maxscale/session.h>
#include <inttypes.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
@ -48,8 +49,10 @@
#include "maxscale/session.h"
#include "maxscale/filter.h"
/** Global session id; updated safely by use of atomic_add */
static int session_id;
/** Global session id counter. Must be updated atomically. Value 0 is reserved for
* dummy/unused sessions.
*/
static uint32_t next_session_id = 1;
static struct session session_dummy_struct;
@ -59,7 +62,8 @@ static void session_simple_free(MXS_SESSION *session, DCB *dcb);
static void session_add_to_all_list(MXS_SESSION *session);
static MXS_SESSION *session_find_free();
static void session_final_free(MXS_SESSION *session);
static MXS_SESSION* session_alloc_body(SERVICE* service, DCB* client_dcb,
MXS_SESSION* session);
/**
* The clientReply of the session.
*
@ -87,30 +91,35 @@ session_initialize(MXS_SESSION *session)
session->ses_chk_tail = CHK_NUM_SESSION;
}
/**
* Allocate a new session for a new client of the specified service.
*
* Create the link to the router session by calling the newSession
* entry point of the router using the router instance of the
* service this session is part of.
*
* @param service The service this connection was established by
* @param client_dcb The client side DCB
* @return The newly created session or NULL if an error occured
*/
MXS_SESSION *
session_alloc(SERVICE *service, DCB *client_dcb)
MXS_SESSION* session_alloc(SERVICE *service, DCB *client_dcb)
{
MXS_SESSION *session = (MXS_SESSION *)(MXS_MALLOC(sizeof(*session)));
if (NULL == session)
{
return NULL;
}
session_initialize(session);
/** Assign a session id and increase */
session->ses_id = (size_t)atomic_add(&session_id, 1) + 1;
session_initialize(session);
session->ses_id = session_get_next_id();
return session_alloc_body(service, client_dcb, session);
}
MXS_SESSION* session_alloc_with_id(SERVICE *service, DCB *client_dcb, uint32_t id)
{
MXS_SESSION *session = (MXS_SESSION *)(MXS_MALLOC(sizeof(*session)));
if (session == NULL)
{
return NULL;
}
session_initialize(session);
session->ses_id = id;
return session_alloc_body(service, client_dcb, session);
}
static MXS_SESSION* session_alloc_body(SERVICE* service, DCB* client_dcb,
MXS_SESSION* session)
{
session->ses_is_child = (bool) DCB_IS_CLONE(client_dcb);
session->service = service;
session->client_dcb = client_dcb;
@ -134,14 +143,13 @@ session_alloc(SERVICE *service, DCB *client_dcb)
session->trx_state = SESSION_TRX_INACTIVE;
session->autocommit = true;
/*
* Only create a router session if we are not the listening
* DCB or an internal DCB. Creating a router session may create a connection to a
* backend server, depending upon the router module implementation
* and should be avoided for the listener session
* Only create a router session if we are not the listening DCB or an
* internal DCB. Creating a router session may create a connection to
* a backend server, depending upon the router module implementation
* and should be avoided for a listener session.
*
* Router session creation may create other DCBs that link to the
* session, therefore it is important that the session lock is
* relinquished before the router call.
* session.
*/
if (client_dcb->state != DCB_STATE_LISTENING &&
client_dcb->dcb_role != DCB_ROLE_INTERNAL)
@ -163,7 +171,7 @@ session_alloc(SERVICE *service, DCB *client_dcb)
* protocol end of the chain.
*/
// NOTE: Here we cast the router instance into a MXS_FILTER and
// NOTE: and the router session into a MXS_FILTER_SESSION and
// NOTE: the router session into a MXS_FILTER_SESSION and
// NOTE: the router routeQuery into a filter routeQuery. That
// NOTE: is in order to be able to treat the router as the first
// NOTE: filter.
@ -197,13 +205,13 @@ session_alloc(SERVICE *service, DCB *client_dcb)
if (session->client_dcb->user == NULL)
{
MXS_INFO("Started session [%lu] for %s service ",
MXS_INFO("Started session [%" PRIu32 "] for %s service ",
session->ses_id,
service->name);
}
else
{
MXS_INFO("Started %s client session [%lu] for '%s' from %s",
MXS_INFO("Started %s client session [%" PRIu32 "] for '%s' from %s",
service->name,
session->ses_id,
session->client_dcb->user,
@ -212,7 +220,7 @@ session_alloc(SERVICE *service, DCB *client_dcb)
}
else
{
MXS_INFO("Start %s client session [%lu] for '%s' from %s failed, will be "
MXS_INFO("Start %s client session [%" PRIu32 "] for '%s' from %s failed, will be "
"closed as soon as all related DCBs have been closed.",
service->name,
session->ses_id,
@ -224,7 +232,7 @@ session_alloc(SERVICE *service, DCB *client_dcb)
CHK_SESSION(session);
client_dcb->session = session;
return SESSION_STATE_TO_BE_FREED == session->state ? NULL : session;
return (session->state == SESSION_STATE_TO_BE_FREED) ? NULL : session;
}
/**
@ -382,7 +390,7 @@ static void session_free(MXS_SESSION *session)
MXS_FREE(session->filters);
}
MXS_INFO("Stopped %s client session [%lu]", session->service->name, session->ses_id);
MXS_INFO("Stopped %s client session [%" PRIu32 "]", session->service->name, session->ses_id);
/** If session doesn't have parent referencing to it, it can be freed */
if (!session->ses_is_child)
@ -494,7 +502,7 @@ dprintSession(DCB *dcb, MXS_SESSION *print_session)
char buf[30];
int i;
dcb_printf(dcb, "Session %lu\n", print_session->ses_id);
dcb_printf(dcb, "Session %" PRIu32 "\n", print_session->ses_id);
dcb_printf(dcb, "\tState: %s\n", session_state(print_session->state));
dcb_printf(dcb, "\tService: %s\n", print_session->service->name);
@ -534,7 +542,7 @@ bool dListSessions_cb(DCB *dcb, void *data)
{
DCB *out_dcb = (DCB*)data;
MXS_SESSION *session = dcb->session;
dcb_printf(out_dcb, "%-16lu | %-15s | %-14s | %s\n", session->ses_id,
dcb_printf(out_dcb, "%-16" PRIu32 " | %-15s | %-14s | %s\n", session->ses_id,
session->client_dcb && session->client_dcb->remote ?
session->client_dcb->remote : "",
session->service && session->service->name ?
@ -898,7 +906,7 @@ static bool ses_find_id(DCB *dcb, void *data)
{
void **params = (void**)data;
MXS_SESSION **ses = (MXS_SESSION**)params[0];
size_t *id = (size_t*)params[1];
uint32_t *id = (uint32_t*)params[1];
bool rval = true;
if (dcb->session->ses_id == *id)
@ -910,7 +918,7 @@ static bool ses_find_id(DCB *dcb, void *data)
return rval;
}
MXS_SESSION* session_get_by_id(int id)
MXS_SESSION* session_get_by_id(uint32_t id)
{
MXS_SESSION *session = NULL;
void *params[] = {&session, &id};
@ -981,3 +989,8 @@ void session_clear_stmt(MXS_SESSION *session)
session->stmt.buffer = NULL;
session->stmt.target = NULL;
}
uint32_t session_get_next_id()
{
return atomic_add_uint32(&next_session_id, 1);
}

View File

@ -229,7 +229,6 @@ int MySQLSendHandshake(DCB* dcb)
char server_scramble[GW_MYSQL_SCRAMBLE_SIZE + 1] = "";
char *version_string;
int len_version_string = 0;
int id_num;
bool is_maria = false;
@ -275,10 +274,9 @@ int MySQLSendHandshake(DCB* dcb)
memcpy(mysql_filler_ten + 6, &new_flags, sizeof(new_flags));
}
// thread id, now put thePID
id_num = getpid() + dcb->fd;
gw_mysql_set_byte4(mysql_thread_id_num, id_num);
// Get the equivalent of the server process id.
protocol->tid = session_get_next_id();
gw_mysql_set_byte4(mysql_thread_id_num, protocol->tid);
memcpy(mysql_scramble_buf, server_scramble, 8);
memcpy(mysql_plugin_data, server_scramble + 8, 12);
@ -702,7 +700,8 @@ gw_read_do_authentication(DCB *dcb, GWBUF *read_buffer, int nbytes_read)
* is changed so that future data will go through the
* normal data handling function instead of this one.
*/
MXS_SESSION *session = session_alloc(dcb->service, dcb);
MXS_SESSION *session =
session_alloc_with_id(dcb->service, dcb, protocol->tid);
if (session != NULL)
{

View File

@ -1724,7 +1724,7 @@ convert_arg(char *arg, int arg_type)
break;
case ARG_TYPE_SESSION:
rval = (unsigned long)session_get_by_id(strtol(arg, NULL, 0));
rval = (unsigned long)session_get_by_id(strtoul(arg, NULL, 0));
break;
case ARG_TYPE_MONITOR:

View File

@ -13,6 +13,8 @@
#include "schemarouter.hh"
#include <inttypes.h>
#include <maxscale/alloc.h>
#include <maxscale/query_classifier.h>
#include <maxscale/modutil.h>
@ -361,7 +363,7 @@ int32_t SchemaRouterSession::routeQuery(GWBUF* pPacket)
if (m_config->debug)
{
sprintf(errbuf + strlen(errbuf),
" ([%lu]: DB change failed)",
" ([%" PRIu32 "]: DB change failed)",
m_client->session->ses_id);
}
@ -989,7 +991,7 @@ bool SchemaRouterSession::handle_default_db()
sprintf(errmsg, "Unknown database '%s'", m_connect_db.c_str());
if (m_config->debug)
{
sprintf(errmsg + strlen(errmsg), " ([%lu]: DB not found on connect)",
sprintf(errmsg + strlen(errmsg), " ([%" PRIu32 "]: DB not found on connect)",
m_client->session->ses_id);
}
write_error_to_client(m_client,