Merge branch 'develop' into MXS-1075
This commit is contained in:
@ -12,6 +12,4 @@ add_subdirectory(readconnroute)
|
||||
add_subdirectory(readwritesplit)
|
||||
add_subdirectory(schemarouter)
|
||||
|
||||
if(BUILD_TESTS)
|
||||
add_subdirectory(testroute)
|
||||
endif()
|
||||
|
||||
|
@ -72,14 +72,14 @@ static const char* alter_table_regex =
|
||||
|
||||
/* The router entry points */
|
||||
static MXS_ROUTER *createInstance(SERVICE *service, char **options);
|
||||
static void *newSession(MXS_ROUTER *instance, MXS_SESSION *session);
|
||||
static void closeSession(MXS_ROUTER *instance, void *router_session);
|
||||
static void freeSession(MXS_ROUTER *instance, void *router_session);
|
||||
static int routeQuery(MXS_ROUTER *instance, void *router_session, GWBUF *queue);
|
||||
static MXS_ROUTER_SESSION *newSession(MXS_ROUTER *instance, MXS_SESSION *session);
|
||||
static void closeSession(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session);
|
||||
static void freeSession(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session);
|
||||
static int routeQuery(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session, GWBUF *queue);
|
||||
static void diagnostics(MXS_ROUTER *instance, DCB *dcb);
|
||||
static void clientReply(MXS_ROUTER *instance, void *router_session, GWBUF *queue,
|
||||
static void clientReply(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session, GWBUF *queue,
|
||||
DCB *backend_dcb);
|
||||
static void errorReply(MXS_ROUTER *instance, void *router_session, GWBUF *message,
|
||||
static void errorReply(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session, GWBUF *message,
|
||||
DCB *backend_dcb, mxs_error_action_t action, bool *succp);
|
||||
static uint64_t getCapabilities(MXS_ROUTER* instance);
|
||||
extern int MaxScaleUptime();
|
||||
@ -630,7 +630,7 @@ createInstance(SERVICE *service, char **options)
|
||||
* @param session The session itself
|
||||
* @return Session specific data for this session
|
||||
*/
|
||||
static void *
|
||||
static MXS_ROUTER_SESSION *
|
||||
newSession(MXS_ROUTER *instance, MXS_SESSION *session)
|
||||
{
|
||||
AVRO_INSTANCE *inst = (AVRO_INSTANCE *) instance;
|
||||
@ -703,7 +703,7 @@ newSession(MXS_ROUTER *instance, MXS_SESSION *session)
|
||||
* @param router_cli_ses The particular session to free
|
||||
*
|
||||
*/
|
||||
static void freeSession(MXS_ROUTER* router_instance, void* router_client_ses)
|
||||
static void freeSession(MXS_ROUTER* router_instance, MXS_ROUTER_SESSION* router_client_ses)
|
||||
{
|
||||
AVRO_INSTANCE *router = (AVRO_INSTANCE *) router_instance;
|
||||
AVRO_CLIENT *client = (AVRO_CLIENT *) router_client_ses;
|
||||
@ -752,7 +752,7 @@ static void freeSession(MXS_ROUTER* router_instance, void* router_client_ses)
|
||||
* @param instance The router instance data
|
||||
* @param router_session The session being closed
|
||||
*/
|
||||
static void closeSession(MXS_ROUTER *instance, void *router_session)
|
||||
static void closeSession(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session)
|
||||
{
|
||||
AVRO_INSTANCE *router = (AVRO_INSTANCE *) instance;
|
||||
AVRO_CLIENT *client = (AVRO_CLIENT *) router_session;
|
||||
@ -784,7 +784,7 @@ static void closeSession(MXS_ROUTER *instance, void *router_session)
|
||||
* @return 1 on success, 0 on error
|
||||
*/
|
||||
static int
|
||||
routeQuery(MXS_ROUTER *instance, void *router_session, GWBUF *queue)
|
||||
routeQuery(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session, GWBUF *queue)
|
||||
{
|
||||
AVRO_INSTANCE *router = (AVRO_INSTANCE *) instance;
|
||||
AVRO_CLIENT *client = (AVRO_CLIENT *) router_session;
|
||||
@ -960,7 +960,7 @@ diagnostics(MXS_ROUTER *router, DCB *dcb)
|
||||
* @param queue The GWBUF with reply data
|
||||
*/
|
||||
static void
|
||||
clientReply(MXS_ROUTER *instance, void *router_session, GWBUF *queue, DCB *backend_dcb)
|
||||
clientReply(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session, GWBUF *queue, DCB *backend_dcb)
|
||||
{
|
||||
/** We should never end up here */
|
||||
ss_dassert(false);
|
||||
@ -1002,7 +1002,7 @@ extract_message(GWBUF *errpkt)
|
||||
*
|
||||
*/
|
||||
static void
|
||||
errorReply(MXS_ROUTER *instance, void *router_session, GWBUF *message, DCB *backend_dcb,
|
||||
errorReply(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session, GWBUF *message, DCB *backend_dcb,
|
||||
mxs_error_action_t action,
|
||||
bool *succp)
|
||||
{
|
||||
|
@ -30,7 +30,7 @@ static bool warn_large_enumset = false; /**< Remove when support for ENUM/SET va
|
||||
|
||||
uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create,
|
||||
avro_value_t *record, uint8_t *ptr,
|
||||
uint8_t *columns_present);
|
||||
uint8_t *columns_present, uint8_t *end);
|
||||
void notify_all_clients(AVRO_INSTANCE *router);
|
||||
void add_used_table(AVRO_INSTANCE* router, const char* table);
|
||||
|
||||
@ -309,9 +309,10 @@ bool handle_row_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr)
|
||||
while (ptr - start < hdr->event_size - BINLOG_EVENT_HDR_LEN)
|
||||
{
|
||||
/** Add the current GTID and timestamp */
|
||||
uint8_t *end = ptr + hdr->event_size;
|
||||
int event_type = get_event_type(hdr->event_type);
|
||||
prepare_record(router, hdr, event_type, &record);
|
||||
ptr = process_row_event_data(map, create, &record, ptr, col_present);
|
||||
ptr = process_row_event_data(map, create, &record, ptr, col_present, end);
|
||||
avro_file_writer_append_value(table->avro_file, &record);
|
||||
|
||||
/** Update rows events have the before and after images of the
|
||||
@ -320,7 +321,7 @@ bool handle_row_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr)
|
||||
if (event_type == UPDATE_EVENT)
|
||||
{
|
||||
prepare_record(router, hdr, UPDATE_EVENT_AFTER, &record);
|
||||
ptr = process_row_event_data(map, create, &record, ptr, col_present);
|
||||
ptr = process_row_event_data(map, create, &record, ptr, col_present, end);
|
||||
avro_file_writer_append_value(table->avro_file, &record);
|
||||
}
|
||||
|
||||
@ -497,7 +498,7 @@ int get_metadata_len(uint8_t type)
|
||||
* @return Pointer to the first byte after the current row event
|
||||
*/
|
||||
uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value_t *record,
|
||||
uint8_t *ptr, uint8_t *columns_present)
|
||||
uint8_t *ptr, uint8_t *columns_present, uint8_t *end)
|
||||
{
|
||||
int npresent = 0;
|
||||
avro_value_t field;
|
||||
@ -507,10 +508,12 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
|
||||
|
||||
/** BIT type values use the extra bits in the row event header */
|
||||
int extra_bits = (((ncolumns + 7) / 8) * 8) - ncolumns;
|
||||
ss_dassert(ptr < end);
|
||||
|
||||
/** Store the null value bitmap */
|
||||
uint8_t *null_bitmap = ptr;
|
||||
ptr += (ncolumns + 7) / 8;
|
||||
ss_dassert(ptr < end);
|
||||
|
||||
for (long i = 0; i < map->columns && npresent < ncolumns; i++)
|
||||
{
|
||||
@ -544,6 +547,7 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
|
||||
}
|
||||
avro_value_set_string(&field, strval);
|
||||
ptr += bytes;
|
||||
ss_dassert(ptr < end);
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -553,6 +557,7 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
|
||||
str[bytes] = '\0';
|
||||
avro_value_set_string(&field, str);
|
||||
ptr += bytes + 1;
|
||||
ss_dassert(ptr < end);
|
||||
}
|
||||
}
|
||||
else if (column_is_bit(map->column_types[i]))
|
||||
@ -572,21 +577,36 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
|
||||
}
|
||||
avro_value_set_int(&field, value);
|
||||
ptr += bytes;
|
||||
ss_dassert(ptr < end);
|
||||
}
|
||||
else if (column_is_decimal(map->column_types[i]))
|
||||
{
|
||||
double f_value = 0.0;
|
||||
ptr += unpack_decimal_field(ptr, metadata + metadata_offset, &f_value);
|
||||
avro_value_set_double(&field, f_value);
|
||||
ss_dassert(ptr < end);
|
||||
}
|
||||
else if (column_is_variable_string(map->column_types[i]))
|
||||
{
|
||||
size_t sz;
|
||||
char *str = mxs_lestr_consume(&ptr, &sz);
|
||||
int bytes = metadata[metadata_offset] | metadata[metadata_offset + 1] << 8;
|
||||
if (bytes > 255)
|
||||
{
|
||||
sz = gw_mysql_get_byte2(ptr);
|
||||
ptr += 2;
|
||||
}
|
||||
else
|
||||
{
|
||||
sz = *ptr;
|
||||
ptr++;
|
||||
}
|
||||
|
||||
char buf[sz + 1];
|
||||
memcpy(buf, str, sz);
|
||||
memcpy(buf, ptr, sz);
|
||||
buf[sz] = '\0';
|
||||
ptr += sz;
|
||||
avro_value_set_string(&field, buf);
|
||||
ss_dassert(ptr < end);
|
||||
}
|
||||
else if (column_is_blob(map->column_types[i]))
|
||||
{
|
||||
@ -596,6 +616,7 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
|
||||
ptr += bytes;
|
||||
avro_value_set_bytes(&field, ptr, len);
|
||||
ptr += len;
|
||||
ss_dassert(ptr < end);
|
||||
}
|
||||
else if (column_is_temporal(map->column_types[i]))
|
||||
{
|
||||
@ -604,6 +625,7 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
|
||||
ptr += unpack_temporal_value(map->column_types[i], ptr, &metadata[metadata_offset], &tm);
|
||||
format_temporal_value(buf, sizeof(buf), map->column_types[i], &tm);
|
||||
avro_value_set_string(&field, buf);
|
||||
ss_dassert(ptr < end);
|
||||
}
|
||||
/** All numeric types (INT, LONG, FLOAT etc.) */
|
||||
else
|
||||
@ -613,6 +635,7 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
|
||||
ptr += unpack_numeric_field(ptr, map->column_types[i],
|
||||
&metadata[metadata_offset], lval);
|
||||
set_numeric_field_value(&field, map->column_types[i], &metadata[metadata_offset], lval);
|
||||
ss_dassert(ptr < end);
|
||||
}
|
||||
ss_dassert(metadata_offset <= map->column_metadata_size);
|
||||
metadata_offset += get_metadata_len(map->column_types[i]);
|
||||
|
@ -449,6 +449,10 @@ static const char *extract_field_name(const char* ptr, char* dest, size_t size)
|
||||
while (*ptr && (isspace(*ptr) || (bt = *ptr == '`')))
|
||||
{
|
||||
ptr++;
|
||||
if (bt)
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (strncasecmp(ptr, "constraint", 10) == 0 || strncasecmp(ptr, "index", 5) == 0 ||
|
||||
@ -481,11 +485,6 @@ static const char *extract_field_name(const char* ptr, char* dest, size_t size)
|
||||
/** Valid identifier */
|
||||
size_t bytes = ptr - start;
|
||||
|
||||
if (bt)
|
||||
{
|
||||
bytes--;
|
||||
}
|
||||
|
||||
memcpy(dest, start, bytes);
|
||||
dest[bytes] = '\0';
|
||||
|
||||
|
@ -90,21 +90,21 @@
|
||||
/* The router entry points */
|
||||
static MXS_ROUTER *createInstance(SERVICE *service, char **options);
|
||||
static void free_instance(ROUTER_INSTANCE *instance);
|
||||
static void *newSession(MXS_ROUTER *instance, MXS_SESSION *session);
|
||||
static void closeSession(MXS_ROUTER *instance, void *router_session);
|
||||
static void freeSession(MXS_ROUTER *instance, void *router_session);
|
||||
static int routeQuery(MXS_ROUTER *instance, void *router_session, GWBUF *queue);
|
||||
static void diagnostics(MXS_ROUTER *instance, DCB *dcb);
|
||||
static void clientReply(MXS_ROUTER *instance,
|
||||
void *router_session,
|
||||
GWBUF *queue,
|
||||
DCB *backend_dcb);
|
||||
static void errorReply(MXS_ROUTER *instance,
|
||||
void *router_session,
|
||||
GWBUF *message,
|
||||
DCB *backend_dcb,
|
||||
mxs_error_action_t action,
|
||||
bool *succp);
|
||||
static MXS_ROUTER_SESSION *newSession(MXS_ROUTER *instance, MXS_SESSION *session);
|
||||
static void closeSession(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session);
|
||||
static void freeSession(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session);
|
||||
static int routeQuery(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session, GWBUF *queue);
|
||||
static void diagnostics(MXS_ROUTER *instance, DCB *dcb);
|
||||
static void clientReply(MXS_ROUTER *instance,
|
||||
MXS_ROUTER_SESSION *router_session,
|
||||
GWBUF *queue,
|
||||
DCB *backend_dcb);
|
||||
static void errorReply(MXS_ROUTER *instance,
|
||||
MXS_ROUTER_SESSION *router_session,
|
||||
GWBUF *message,
|
||||
DCB *backend_dcb,
|
||||
mxs_error_action_t action,
|
||||
bool *succp);
|
||||
|
||||
static uint64_t getCapabilities(MXS_ROUTER* instance);
|
||||
static int blr_handler_config(void *userdata, const char *section, const char *name, const char *value);
|
||||
@ -998,7 +998,7 @@ free_instance(ROUTER_INSTANCE *instance)
|
||||
* @param session The session itself
|
||||
* @return Session specific data for this session
|
||||
*/
|
||||
static void *
|
||||
static MXS_ROUTER_SESSION *
|
||||
newSession(MXS_ROUTER *instance, MXS_SESSION *session)
|
||||
{
|
||||
ROUTER_INSTANCE *inst = (ROUTER_INSTANCE *)instance;
|
||||
@ -1069,7 +1069,7 @@ newSession(MXS_ROUTER *instance, MXS_SESSION *session)
|
||||
*
|
||||
*/
|
||||
static void freeSession(MXS_ROUTER* router_instance,
|
||||
void* router_client_ses)
|
||||
MXS_ROUTER_SESSION* router_client_ses)
|
||||
{
|
||||
ROUTER_INSTANCE *router = (ROUTER_INSTANCE *)router_instance;
|
||||
ROUTER_SLAVE *slave = (ROUTER_SLAVE *)router_client_ses;
|
||||
@ -1140,7 +1140,7 @@ static void freeSession(MXS_ROUTER* router_instance,
|
||||
* @param router_session The session being closed
|
||||
*/
|
||||
static void
|
||||
closeSession(MXS_ROUTER *instance, void *router_session)
|
||||
closeSession(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session)
|
||||
{
|
||||
ROUTER_INSTANCE *router = (ROUTER_INSTANCE *)instance;
|
||||
ROUTER_SLAVE *slave = (ROUTER_SLAVE *)router_session;
|
||||
@ -1226,7 +1226,7 @@ closeSession(MXS_ROUTER *instance, void *router_session)
|
||||
* @return The number of bytes sent
|
||||
*/
|
||||
static int
|
||||
routeQuery(MXS_ROUTER *instance, void *router_session, GWBUF *queue)
|
||||
routeQuery(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session, GWBUF *queue)
|
||||
{
|
||||
ROUTER_INSTANCE *router = (ROUTER_INSTANCE *)instance;
|
||||
ROUTER_SLAVE *slave = (ROUTER_SLAVE *)router_session;
|
||||
@ -1732,7 +1732,7 @@ diagnostics(MXS_ROUTER *router, DCB *dcb)
|
||||
* @param queue The GWBUF with reply data
|
||||
*/
|
||||
static void
|
||||
clientReply(MXS_ROUTER *instance, void *router_session, GWBUF *queue, DCB *backend_dcb)
|
||||
clientReply(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session, GWBUF *queue, DCB *backend_dcb)
|
||||
{
|
||||
ROUTER_INSTANCE *router = (ROUTER_INSTANCE *)instance;
|
||||
|
||||
@ -1778,7 +1778,7 @@ extract_message(GWBUF *errpkt)
|
||||
*/
|
||||
static void
|
||||
errorReply(MXS_ROUTER *instance,
|
||||
void *router_session,
|
||||
MXS_ROUTER_SESSION *router_session,
|
||||
GWBUF *message,
|
||||
DCB *backend_dcb,
|
||||
mxs_error_action_t action,
|
||||
|
@ -2487,6 +2487,7 @@ blr_file_write_master_config(ROUTER_INSTANCE *router, char *error)
|
||||
|
||||
if (chmod(tmp_file, S_IRUSR | S_IWUSR) < 0)
|
||||
{
|
||||
fclose(config_file);
|
||||
snprintf(error, BINLOG_ERROR_MSG_LEN, "%s, errno %u",
|
||||
strerror_r(errno, err_msg, sizeof(err_msg)), errno);
|
||||
return 2;
|
||||
|
@ -44,11 +44,11 @@
|
||||
|
||||
/* The router entry points */
|
||||
static MXS_ROUTER *createInstance(SERVICE *service, char **options);
|
||||
static void *newSession(MXS_ROUTER *instance, MXS_SESSION *session);
|
||||
static void closeSession(MXS_ROUTER *instance, void *router_session);
|
||||
static void freeSession(MXS_ROUTER *instance, void *router_session);
|
||||
static int execute(MXS_ROUTER *instance, void *router_session, GWBUF *queue);
|
||||
static void diagnostics(MXS_ROUTER *instance, DCB *dcb);
|
||||
static MXS_ROUTER_SESSION *newSession(MXS_ROUTER *instance, MXS_SESSION *session);
|
||||
static void closeSession(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session);
|
||||
static void freeSession(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session);
|
||||
static int execute(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session, GWBUF *queue);
|
||||
static void diagnostics(MXS_ROUTER *instance, DCB *dcb);
|
||||
static uint64_t getCapabilities(MXS_ROUTER* instance);
|
||||
|
||||
extern int execute_cmd(CLI_SESSION *cli);
|
||||
@ -156,7 +156,7 @@ createInstance(SERVICE *service, char **options)
|
||||
* @param session The session itself
|
||||
* @return Session specific data for this session
|
||||
*/
|
||||
static void *
|
||||
static MXS_ROUTER_SESSION *
|
||||
newSession(MXS_ROUTER *instance, MXS_SESSION *session)
|
||||
{
|
||||
CLI_INSTANCE *inst = (CLI_INSTANCE *)instance;
|
||||
@ -188,7 +188,7 @@ newSession(MXS_ROUTER *instance, MXS_SESSION *session)
|
||||
* @param router_session The session being closed
|
||||
*/
|
||||
static void
|
||||
closeSession(MXS_ROUTER *instance, void *router_session)
|
||||
closeSession(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session)
|
||||
{
|
||||
CLI_INSTANCE *inst = (CLI_INSTANCE *)instance;
|
||||
CLI_SESSION *session = (CLI_SESSION *)router_session;
|
||||
@ -224,9 +224,8 @@ closeSession(MXS_ROUTER *instance, void *router_session)
|
||||
* @param router_instance The router session
|
||||
* @param router_client_session The router session as returned from newSession
|
||||
*/
|
||||
static void freeSession(
|
||||
MXS_ROUTER* router_instance,
|
||||
void* router_client_session)
|
||||
static void freeSession(MXS_ROUTER* router_instance,
|
||||
MXS_ROUTER_SESSION* router_client_session)
|
||||
{
|
||||
MXS_FREE(router_client_session);
|
||||
return;
|
||||
@ -243,7 +242,7 @@ static void freeSession(
|
||||
* @return The number of bytes sent
|
||||
*/
|
||||
static int
|
||||
execute(MXS_ROUTER *instance, void *router_session, GWBUF *queue)
|
||||
execute(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session, GWBUF *queue)
|
||||
{
|
||||
CLI_SESSION *session = (CLI_SESSION *)router_session;
|
||||
|
||||
|
@ -43,11 +43,11 @@
|
||||
|
||||
/* The router entry points */
|
||||
static MXS_ROUTER *createInstance(SERVICE *service, char **options);
|
||||
static void *newSession(MXS_ROUTER *instance, MXS_SESSION *session);
|
||||
static void closeSession(MXS_ROUTER *instance, void *router_session);
|
||||
static void freeSession(MXS_ROUTER *instance, void *router_session);
|
||||
static int execute(MXS_ROUTER *instance, void *router_session, GWBUF *queue);
|
||||
static void diagnostics(MXS_ROUTER *instance, DCB *dcb);
|
||||
static MXS_ROUTER_SESSION *newSession(MXS_ROUTER *instance, MXS_SESSION *session);
|
||||
static void closeSession(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session);
|
||||
static void freeSession(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session);
|
||||
static int execute(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session, GWBUF *queue);
|
||||
static void diagnostics(MXS_ROUTER *instance, DCB *dcb);
|
||||
static uint64_t getCapabilities(MXS_ROUTER* instance);
|
||||
|
||||
extern int execute_cmd(CLI_SESSION *cli);
|
||||
@ -147,7 +147,7 @@ createInstance(SERVICE *service, char **options)
|
||||
* @param session The session itself
|
||||
* @return Session specific data for this session
|
||||
*/
|
||||
static void *
|
||||
static MXS_ROUTER_SESSION *
|
||||
newSession(MXS_ROUTER *instance, MXS_SESSION *session)
|
||||
{
|
||||
CLI_INSTANCE *inst = (CLI_INSTANCE *)instance;
|
||||
@ -182,7 +182,7 @@ newSession(MXS_ROUTER *instance, MXS_SESSION *session)
|
||||
* @param router_session The session being closed
|
||||
*/
|
||||
static void
|
||||
closeSession(MXS_ROUTER *instance, void *router_session)
|
||||
closeSession(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session)
|
||||
{
|
||||
CLI_INSTANCE *inst = (CLI_INSTANCE *)instance;
|
||||
CLI_SESSION *session = (CLI_SESSION *)router_session;
|
||||
@ -218,9 +218,8 @@ closeSession(MXS_ROUTER *instance, void *router_session)
|
||||
* @param router_instance The router session
|
||||
* @param router_client_session The router session as returned from newSession
|
||||
*/
|
||||
static void freeSession(
|
||||
MXS_ROUTER* router_instance,
|
||||
void* router_client_session)
|
||||
static void freeSession(MXS_ROUTER* router_instance,
|
||||
MXS_ROUTER_SESSION* router_client_session)
|
||||
{
|
||||
MXS_FREE(router_client_session);
|
||||
return;
|
||||
@ -237,7 +236,7 @@ static void freeSession(
|
||||
* @return The number of bytes sent
|
||||
*/
|
||||
static int
|
||||
execute(MXS_ROUTER *instance, void *router_session, GWBUF *queue)
|
||||
execute(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session, GWBUF *queue)
|
||||
{
|
||||
CLI_SESSION *session = (CLI_SESSION *)router_session;
|
||||
|
||||
|
@ -65,18 +65,18 @@ static int handle_url(INFO_INSTANCE *instance, INFO_SESSION *router_session, GWB
|
||||
|
||||
/* The router entry points */
|
||||
static MXS_ROUTER *createInstance(SERVICE *service, char **options);
|
||||
static void *newSession(MXS_ROUTER *instance, MXS_SESSION *session);
|
||||
static void closeSession(MXS_ROUTER *instance, void *router_session);
|
||||
static void freeSession(MXS_ROUTER *instance, void *router_session);
|
||||
static int execute(MXS_ROUTER *instance, void *router_session, GWBUF *queue);
|
||||
static void diagnostics(MXS_ROUTER *instance, DCB *dcb);
|
||||
static MXS_ROUTER_SESSION *newSession(MXS_ROUTER *instance, MXS_SESSION *session);
|
||||
static void closeSession(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session);
|
||||
static void freeSession(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session);
|
||||
static int execute(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session, GWBUF *queue);
|
||||
static void diagnostics(MXS_ROUTER *instance, DCB *dcb);
|
||||
static uint64_t getCapabilities(MXS_ROUTER* instance);
|
||||
static void handleError(MXS_ROUTER *instance,
|
||||
void *router_session,
|
||||
GWBUF *errbuf,
|
||||
DCB *backend_dcb,
|
||||
mxs_error_action_t action,
|
||||
bool *succp);
|
||||
static void handleError(MXS_ROUTER *instance,
|
||||
MXS_ROUTER_SESSION *router_session,
|
||||
GWBUF *errbuf,
|
||||
DCB *backend_dcb,
|
||||
mxs_error_action_t action,
|
||||
bool *succp);
|
||||
|
||||
static SPINLOCK instlock;
|
||||
static INFO_INSTANCE *instances;
|
||||
@ -180,7 +180,7 @@ createInstance(SERVICE *service, char **options)
|
||||
* @param session The session itself
|
||||
* @return Session specific data for this session
|
||||
*/
|
||||
static void *
|
||||
static MXS_ROUTER_SESSION *
|
||||
newSession(MXS_ROUTER *instance, MXS_SESSION *session)
|
||||
{
|
||||
INFO_INSTANCE *inst = (INFO_INSTANCE *)instance;
|
||||
@ -212,7 +212,7 @@ newSession(MXS_ROUTER *instance, MXS_SESSION *session)
|
||||
* @param router_session The session being closed
|
||||
*/
|
||||
static void
|
||||
closeSession(MXS_ROUTER *instance, void *router_session)
|
||||
closeSession(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session)
|
||||
{
|
||||
INFO_INSTANCE *inst = (INFO_INSTANCE *)instance;
|
||||
INFO_SESSION *session = (INFO_SESSION *)router_session;
|
||||
@ -249,7 +249,7 @@ closeSession(MXS_ROUTER *instance, void *router_session)
|
||||
* @param router_client_session The router session as returned from newSession
|
||||
*/
|
||||
static void freeSession(MXS_ROUTER* router_instance,
|
||||
void* router_client_session)
|
||||
MXS_ROUTER_SESSION* router_client_session)
|
||||
{
|
||||
MXS_FREE(router_client_session);
|
||||
return;
|
||||
@ -268,12 +268,12 @@ static void freeSession(MXS_ROUTER* router_instance,
|
||||
* @param succp Result of action: true iff router can continue
|
||||
*
|
||||
*/
|
||||
static void handleError(MXS_ROUTER *instance,
|
||||
void *router_session,
|
||||
GWBUF *errbuf,
|
||||
DCB *backend_dcb,
|
||||
static void handleError(MXS_ROUTER *instance,
|
||||
MXS_ROUTER_SESSION *router_session,
|
||||
GWBUF *errbuf,
|
||||
DCB *backend_dcb,
|
||||
mxs_error_action_t action,
|
||||
bool *succp)
|
||||
bool *succp)
|
||||
|
||||
{
|
||||
DCB *client_dcb;
|
||||
@ -316,7 +316,7 @@ static void handleError(MXS_ROUTER *instance,
|
||||
* @return The number of bytes sent
|
||||
*/
|
||||
static int
|
||||
execute(MXS_ROUTER *rinstance, void *router_session, GWBUF *queue)
|
||||
execute(MXS_ROUTER *rinstance, MXS_ROUTER_SESSION *router_session, GWBUF *queue)
|
||||
{
|
||||
INFO_INSTANCE *instance = (INFO_INSTANCE *)rinstance;
|
||||
INFO_SESSION *session = (INFO_SESSION *)router_session;
|
||||
|
@ -89,14 +89,14 @@
|
||||
|
||||
/* The router entry points */
|
||||
static MXS_ROUTER *createInstance(SERVICE *service, char **options);
|
||||
static void *newSession(MXS_ROUTER *instance, MXS_SESSION *session);
|
||||
static void closeSession(MXS_ROUTER *instance, void *router_session);
|
||||
static void freeSession(MXS_ROUTER *instance, void *router_session);
|
||||
static int routeQuery(MXS_ROUTER *instance, void *router_session, GWBUF *queue);
|
||||
static MXS_ROUTER_SESSION *newSession(MXS_ROUTER *instance, MXS_SESSION *session);
|
||||
static void closeSession(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session);
|
||||
static void freeSession(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session);
|
||||
static int routeQuery(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session, GWBUF *queue);
|
||||
static void diagnostics(MXS_ROUTER *instance, DCB *dcb);
|
||||
static void clientReply(MXS_ROUTER *instance, void *router_session, GWBUF *queue,
|
||||
static void clientReply(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session, GWBUF *queue,
|
||||
DCB *backend_dcb);
|
||||
static void handleError(MXS_ROUTER *instance, void *router_session, GWBUF *errbuf,
|
||||
static void handleError(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session, GWBUF *errbuf,
|
||||
DCB *problem_dcb, mxs_error_action_t action, bool *succp);
|
||||
static uint64_t getCapabilities(MXS_ROUTER* instance);
|
||||
static bool rses_begin_locked_router_action(ROUTER_CLIENT_SES* rses);
|
||||
@ -257,7 +257,7 @@ createInstance(SERVICE *service, char **options)
|
||||
* @param session The session itself
|
||||
* @return Session specific data for this session
|
||||
*/
|
||||
static void *
|
||||
static MXS_ROUTER_SESSION *
|
||||
newSession(MXS_ROUTER *instance, MXS_SESSION *session)
|
||||
{
|
||||
ROUTER_INSTANCE *inst = (ROUTER_INSTANCE *) instance;
|
||||
@ -450,7 +450,7 @@ newSession(MXS_ROUTER *instance, MXS_SESSION *session)
|
||||
* @details (write detailed description here)
|
||||
*
|
||||
*/
|
||||
static void freeSession(MXS_ROUTER* router_instance, void* router_client_ses)
|
||||
static void freeSession(MXS_ROUTER* router_instance, MXS_ROUTER_SESSION* router_client_ses)
|
||||
{
|
||||
ROUTER_INSTANCE* router = (ROUTER_INSTANCE *) router_instance;
|
||||
ROUTER_CLIENT_SES* router_cli_ses = (ROUTER_CLIENT_SES *) router_client_ses;
|
||||
@ -469,7 +469,7 @@ static void freeSession(MXS_ROUTER* router_instance, void* router_client_ses)
|
||||
* @param router_session The session being closed
|
||||
*/
|
||||
static void
|
||||
closeSession(MXS_ROUTER *instance, void *router_session)
|
||||
closeSession(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session)
|
||||
{
|
||||
ROUTER_CLIENT_SES *router_cli_ses = (ROUTER_CLIENT_SES *) router_session;
|
||||
DCB* backend_dcb;
|
||||
@ -537,7 +537,7 @@ static void log_closed_session(mysql_server_cmd_t mysql_command, bool is_closed,
|
||||
* @return if succeed 1, otherwise 0
|
||||
*/
|
||||
static int
|
||||
routeQuery(MXS_ROUTER *instance, void *router_session, GWBUF *queue)
|
||||
routeQuery(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session, GWBUF *queue)
|
||||
{
|
||||
ROUTER_INSTANCE *inst = (ROUTER_INSTANCE *) instance;
|
||||
ROUTER_CLIENT_SES *router_cli_ses = (ROUTER_CLIENT_SES *) router_session;
|
||||
@ -656,7 +656,7 @@ diagnostics(MXS_ROUTER *router, DCB *dcb)
|
||||
* @param queue The GWBUF with reply data
|
||||
*/
|
||||
static void
|
||||
clientReply(MXS_ROUTER *instance, void *router_session, GWBUF *queue, DCB *backend_dcb)
|
||||
clientReply(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session, GWBUF *queue, DCB *backend_dcb)
|
||||
{
|
||||
ss_dassert(backend_dcb->session->client_dcb != NULL);
|
||||
MXS_SESSION_ROUTE_REPLY(backend_dcb->session, queue);
|
||||
@ -675,7 +675,7 @@ clientReply(MXS_ROUTER *instance, void *router_session, GWBUF *queue, DCB *backe
|
||||
* @param succp Result of action: true if router can continue
|
||||
*
|
||||
*/
|
||||
static void handleError(MXS_ROUTER *instance, void *router_session, GWBUF *errbuf,
|
||||
static void handleError(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session, GWBUF *errbuf,
|
||||
DCB *problem_dcb, mxs_error_action_t action, bool *succp)
|
||||
|
||||
{
|
||||
|
@ -69,14 +69,14 @@
|
||||
*/
|
||||
|
||||
static MXS_ROUTER *createInstance(SERVICE *service, char **options);
|
||||
static void *newSession(MXS_ROUTER *instance, MXS_SESSION *session);
|
||||
static void closeSession(MXS_ROUTER *instance, void *session);
|
||||
static void freeSession(MXS_ROUTER *instance, void *session);
|
||||
static int routeQuery(MXS_ROUTER *instance, void *session, GWBUF *queue);
|
||||
static MXS_ROUTER_SESSION *newSession(MXS_ROUTER *instance, MXS_SESSION *session);
|
||||
static void closeSession(MXS_ROUTER *instance, MXS_ROUTER_SESSION *session);
|
||||
static void freeSession(MXS_ROUTER *instance, MXS_ROUTER_SESSION *session);
|
||||
static int routeQuery(MXS_ROUTER *instance, MXS_ROUTER_SESSION *session, GWBUF *queue);
|
||||
static void diagnostics(MXS_ROUTER *instance, DCB *dcb);
|
||||
static void clientReply(MXS_ROUTER *instance, void *router_session, GWBUF *queue,
|
||||
static void clientReply(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session, GWBUF *queue,
|
||||
DCB *backend_dcb);
|
||||
static void handleError(MXS_ROUTER *instance, void *router_session,
|
||||
static void handleError(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session,
|
||||
GWBUF *errmsgbuf, DCB *backend_dcb,
|
||||
mxs_error_action_t action, bool *succp);
|
||||
static uint64_t getCapabilities(MXS_ROUTER* instance);
|
||||
@ -250,7 +250,6 @@ static MXS_ROUTER *createInstance(SERVICE *service, char **options)
|
||||
return NULL;
|
||||
}
|
||||
router->service = service;
|
||||
spinlock_init(&router->lock);
|
||||
|
||||
/*
|
||||
* Until we know otherwise assume we have some available slaves.
|
||||
@ -313,7 +312,7 @@ static MXS_ROUTER *createInstance(SERVICE *service, char **options)
|
||||
* @param session The MaxScale session (generic connection data)
|
||||
* @return Session specific data for this session, i.e. a router session
|
||||
*/
|
||||
static void *newSession(MXS_ROUTER *router_inst, MXS_SESSION *session)
|
||||
static MXS_ROUTER_SESSION *newSession(MXS_ROUTER *router_inst, MXS_SESSION *session)
|
||||
{
|
||||
ROUTER_INSTANCE *router = (ROUTER_INSTANCE *)router_inst;
|
||||
ROUTER_CLIENT_SES *client_rses = (ROUTER_CLIENT_SES *)MXS_CALLOC(1, sizeof(ROUTER_CLIENT_SES));
|
||||
@ -331,7 +330,6 @@ static void *newSession(MXS_ROUTER *router_inst, MXS_SESSION *session)
|
||||
client_rses->client_dcb = session->client_dcb;
|
||||
client_rses->have_tmp_tables = false;
|
||||
client_rses->forced_node = NULL;
|
||||
spinlock_init(&client_rses->rses_lock);
|
||||
memcpy(&client_rses->rses_config, &router->rwsplit_config, sizeof(client_rses->rses_config));
|
||||
|
||||
int router_nservers = router->service->n_dbref;
|
||||
@ -404,12 +402,12 @@ static void *newSession(MXS_ROUTER *router_inst, MXS_SESSION *session)
|
||||
* @param instance The router instance data
|
||||
* @param session The router session being closed
|
||||
*/
|
||||
static void closeSession(MXS_ROUTER *instance, void *router_session)
|
||||
static void closeSession(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session)
|
||||
{
|
||||
ROUTER_CLIENT_SES *router_cli_ses = (ROUTER_CLIENT_SES *)router_session;
|
||||
CHK_CLIENT_RSES(router_cli_ses);
|
||||
|
||||
if (!router_cli_ses->rses_closed && rses_begin_locked_router_action(router_cli_ses))
|
||||
if (!router_cli_ses->rses_closed)
|
||||
{
|
||||
/**
|
||||
* Mark router session as closed. @c rses_closed is checked at the start
|
||||
@ -465,8 +463,6 @@ static void closeSession(MXS_ROUTER *instance, void *router_session)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
rses_end_locked_router_action(router_cli_ses);
|
||||
}
|
||||
}
|
||||
|
||||
@ -480,7 +476,7 @@ static void closeSession(MXS_ROUTER *instance, void *router_session)
|
||||
* @param router_client_session Client session
|
||||
*
|
||||
*/
|
||||
static void freeSession(MXS_ROUTER *router_instance, void *router_client_session)
|
||||
static void freeSession(MXS_ROUTER *router_instance, MXS_ROUTER_SESSION *router_client_session)
|
||||
{
|
||||
ROUTER_CLIENT_SES *router_cli_ses = (ROUTER_CLIENT_SES *)router_client_session;
|
||||
|
||||
@ -553,7 +549,7 @@ void close_failed_bref(backend_ref_t *bref, bool fatal)
|
||||
* @param querybuf Buffer containing the query
|
||||
* @return 1 on success, 0 on error
|
||||
*/
|
||||
static int routeQuery(MXS_ROUTER *instance, void *router_session, GWBUF *querybuf)
|
||||
static int routeQuery(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session, GWBUF *querybuf)
|
||||
{
|
||||
ROUTER_INSTANCE *inst = (ROUTER_INSTANCE *) instance;
|
||||
ROUTER_CLIENT_SES *rses = (ROUTER_CLIENT_SES *) router_session;
|
||||
@ -665,34 +661,27 @@ static void diagnostics(MXS_ROUTER *instance, DCB *dcb)
|
||||
* @param backend_dcb The backend DCB
|
||||
* @param queue The GWBUF with reply data
|
||||
*/
|
||||
static void clientReply(MXS_ROUTER *instance, void *router_session, GWBUF *writebuf,
|
||||
static void clientReply(MXS_ROUTER *instance,
|
||||
MXS_ROUTER_SESSION *router_session,
|
||||
GWBUF *writebuf,
|
||||
DCB *backend_dcb)
|
||||
{
|
||||
DCB *client_dcb;
|
||||
ROUTER_INSTANCE *router_inst;
|
||||
ROUTER_CLIENT_SES *router_cli_ses;
|
||||
sescmd_cursor_t *scur = NULL;
|
||||
backend_ref_t *bref;
|
||||
ROUTER_CLIENT_SES *router_cli_ses = (ROUTER_CLIENT_SES *)router_session;
|
||||
ROUTER_INSTANCE *router_inst = (ROUTER_INSTANCE *)instance;
|
||||
DCB *client_dcb = backend_dcb->session->client_dcb;
|
||||
|
||||
router_cli_ses = (ROUTER_CLIENT_SES *)router_session;
|
||||
router_inst = (ROUTER_INSTANCE *)instance;
|
||||
CHK_CLIENT_RSES(router_cli_ses);
|
||||
|
||||
/**
|
||||
* Lock router client session for secure read of router session members.
|
||||
* Note that this could be done without lock by using version #
|
||||
*/
|
||||
if (!rses_begin_locked_router_action(router_cli_ses))
|
||||
if (router_cli_ses->rses_closed)
|
||||
{
|
||||
gwbuf_free(writebuf);
|
||||
goto lock_failed;
|
||||
return;
|
||||
}
|
||||
/** Holding lock ensures that router session remains open */
|
||||
ss_dassert(backend_dcb->session != NULL);
|
||||
client_dcb = backend_dcb->session->client_dcb;
|
||||
|
||||
/** Unlock */
|
||||
rses_end_locked_router_action(router_cli_ses);
|
||||
/**
|
||||
* 1. Check if backend received reply to sescmd.
|
||||
* 2. Check sescmd's state whether OK_PACKET has been
|
||||
@ -702,32 +691,10 @@ static void clientReply(MXS_ROUTER *instance, void *router_session, GWBUF *write
|
||||
* 3. If reply for this sescmd is sent, lock property cursor
|
||||
* and
|
||||
*/
|
||||
if (client_dcb == NULL)
|
||||
{
|
||||
gwbuf_free(writebuf);
|
||||
/** Log that client was closed before reply */
|
||||
goto lock_failed;
|
||||
}
|
||||
/** Lock router session */
|
||||
if (!rses_begin_locked_router_action(router_cli_ses))
|
||||
{
|
||||
/** Log to debug that router was closed */
|
||||
goto lock_failed;
|
||||
}
|
||||
bref = get_bref_from_dcb(router_cli_ses, backend_dcb);
|
||||
|
||||
#if !defined(FOR_BUG548_FIX_ONLY)
|
||||
/** This makes the issue becoming visible in poll.c */
|
||||
if (bref == NULL)
|
||||
{
|
||||
/** Unlock router session */
|
||||
rses_end_locked_router_action(router_cli_ses);
|
||||
goto lock_failed;
|
||||
}
|
||||
#endif
|
||||
|
||||
backend_ref_t *bref = get_bref_from_dcb(router_cli_ses, backend_dcb);
|
||||
CHK_BACKEND_REF(bref);
|
||||
scur = &bref->bref_sescmd_cur;
|
||||
sescmd_cursor_t *scur = &bref->bref_sescmd_cur;
|
||||
|
||||
/** Statement was successfully executed, free the stored statement */
|
||||
session_clear_stmt(backend_dcb->session);
|
||||
@ -788,15 +755,7 @@ static void clientReply(MXS_ROUTER *instance, void *router_session, GWBUF *write
|
||||
/** Write reply to client DCB */
|
||||
MXS_SESSION_ROUTE_REPLY(backend_dcb->session, writebuf);
|
||||
}
|
||||
/** Unlock router session */
|
||||
rses_end_locked_router_action(router_cli_ses);
|
||||
|
||||
/** Lock router session */
|
||||
if (!rses_begin_locked_router_action(router_cli_ses))
|
||||
{
|
||||
/** Log to debug that router was closed */
|
||||
goto lock_failed;
|
||||
}
|
||||
/** There is one pending session command to be executed. */
|
||||
if (sescmd_cursor_is_active(scur))
|
||||
{
|
||||
@ -847,11 +806,6 @@ static void clientReply(MXS_ROUTER *instance, void *router_session, GWBUF *write
|
||||
gwbuf_free(bref->bref_pending_cmd);
|
||||
bref->bref_pending_cmd = NULL;
|
||||
}
|
||||
/** Unlock router session */
|
||||
rses_end_locked_router_action(router_cli_ses);
|
||||
|
||||
lock_failed:
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
@ -876,64 +830,6 @@ static uint64_t getCapabilities(MXS_ROUTER* instance)
|
||||
* functions are not intended for use outside the read write split router.
|
||||
*/
|
||||
|
||||
/**
|
||||
* @brief Acquires lock to router client session if it is not closed.
|
||||
*
|
||||
* Parameters:
|
||||
* @param rses - in, use
|
||||
*
|
||||
*
|
||||
* @return true if router session was not closed. If return value is true
|
||||
* it means that router is locked, and must be unlocked later. False, if
|
||||
* router was closed before lock was acquired.
|
||||
*
|
||||
*/
|
||||
bool rses_begin_locked_router_action(ROUTER_CLIENT_SES *rses)
|
||||
{
|
||||
bool succp = false;
|
||||
|
||||
if (rses == NULL)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
CHK_CLIENT_RSES(rses);
|
||||
|
||||
if (rses->rses_closed)
|
||||
{
|
||||
|
||||
goto return_succp;
|
||||
}
|
||||
spinlock_acquire(&rses->rses_lock);
|
||||
if (rses->rses_closed)
|
||||
{
|
||||
spinlock_release(&rses->rses_lock);
|
||||
goto return_succp;
|
||||
}
|
||||
succp = true;
|
||||
|
||||
return_succp:
|
||||
return succp;
|
||||
}
|
||||
|
||||
/** to be inline'd */
|
||||
|
||||
/**
|
||||
* @brief Releases router client session lock.
|
||||
*
|
||||
* Parameters:
|
||||
* @param rses - <usage>
|
||||
* <description>
|
||||
*
|
||||
* @return void
|
||||
*
|
||||
*/
|
||||
void rses_end_locked_router_action(ROUTER_CLIENT_SES *rses)
|
||||
{
|
||||
CHK_CLIENT_RSES(rses);
|
||||
spinlock_release(&rses->rses_lock);
|
||||
}
|
||||
|
||||
/*
|
||||
* @brief Clear one or more bits in the backend reference state
|
||||
*
|
||||
@ -1135,28 +1031,21 @@ int rses_get_max_replication_lag(ROUTER_CLIENT_SES *rses)
|
||||
*/
|
||||
backend_ref_t *get_bref_from_dcb(ROUTER_CLIENT_SES *rses, DCB *dcb)
|
||||
{
|
||||
backend_ref_t *bref;
|
||||
int i = 0;
|
||||
ss_dassert(dcb->dcb_role == DCB_ROLE_BACKEND_HANDLER);
|
||||
CHK_DCB(dcb);
|
||||
CHK_CLIENT_RSES(rses);
|
||||
|
||||
bref = rses->rses_backend_ref;
|
||||
|
||||
while (i < rses->rses_nbackends)
|
||||
for (int i = 0; i < rses->rses_nbackends; i++)
|
||||
{
|
||||
if (bref->bref_dcb == dcb)
|
||||
if (rses->rses_backend_ref[i].bref_dcb == dcb)
|
||||
{
|
||||
break;
|
||||
return &rses->rses_backend_ref[i];
|
||||
}
|
||||
bref++;
|
||||
i += 1;
|
||||
}
|
||||
|
||||
if (i == rses->rses_nbackends)
|
||||
{
|
||||
bref = NULL;
|
||||
}
|
||||
return bref;
|
||||
/** We should always have a valid backend reference */
|
||||
ss_dassert(false);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -1284,17 +1173,19 @@ static bool rwsplit_process_router_options(ROUTER_INSTANCE *router,
|
||||
* Even if succp == true connecting to new slave may have failed. succp is to
|
||||
* tell whether router has enough master/slave connections to continue work.
|
||||
*/
|
||||
static void handleError(MXS_ROUTER *instance, void *router_session,
|
||||
GWBUF *errmsgbuf, DCB *problem_dcb,
|
||||
mxs_error_action_t action, bool *succp)
|
||||
static void handleError(MXS_ROUTER *instance,
|
||||
MXS_ROUTER_SESSION *router_session,
|
||||
GWBUF *errmsgbuf,
|
||||
DCB *problem_dcb,
|
||||
mxs_error_action_t action,
|
||||
bool *succp)
|
||||
{
|
||||
MXS_SESSION *session;
|
||||
ROUTER_INSTANCE *inst = (ROUTER_INSTANCE *)instance;
|
||||
ROUTER_CLIENT_SES *rses = (ROUTER_CLIENT_SES *)router_session;
|
||||
|
||||
CHK_CLIENT_RSES(rses);
|
||||
CHK_DCB(problem_dcb);
|
||||
|
||||
if (!rses_begin_locked_router_action(rses))
|
||||
if (rses->rses_closed)
|
||||
{
|
||||
/** Session is already closed */
|
||||
problem_dcb->dcb_errhandle_called = true;
|
||||
@ -1311,32 +1202,24 @@ static void handleError(MXS_ROUTER *instance, void *router_session,
|
||||
* be safe with the code as it stands on 9 Sept 2015 - MNB
|
||||
*/
|
||||
*succp = true;
|
||||
rses_end_locked_router_action(rses);
|
||||
return;
|
||||
}
|
||||
else
|
||||
{
|
||||
problem_dcb->dcb_errhandle_called = true;
|
||||
}
|
||||
session = problem_dcb->session;
|
||||
|
||||
backend_ref_t *bref = get_bref_from_dcb(rses, problem_dcb);
|
||||
MXS_SESSION *session = problem_dcb->session;
|
||||
ss_dassert(session);
|
||||
|
||||
if (session == NULL)
|
||||
{
|
||||
MXS_ERROR("Session of DCB %p is NULL, won't close the DCB.", problem_dcb);
|
||||
ss_dassert(false);
|
||||
*succp = false;
|
||||
}
|
||||
else if (DCB_ROLE_CLIENT_HANDLER == problem_dcb->dcb_role)
|
||||
if (problem_dcb->dcb_role == DCB_ROLE_CLIENT_HANDLER)
|
||||
{
|
||||
dcb_close(problem_dcb);
|
||||
*succp = false;
|
||||
}
|
||||
else
|
||||
{
|
||||
CHK_SESSION(session);
|
||||
CHK_CLIENT_RSES(rses);
|
||||
backend_ref_t *bref = get_bref_from_dcb(rses, problem_dcb);
|
||||
|
||||
switch (action)
|
||||
{
|
||||
@ -1449,8 +1332,6 @@ static void handleError(MXS_ROUTER *instance, void *router_session,
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
rses_end_locked_router_action(rses);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -1570,7 +1451,6 @@ static bool handle_error_new_connection(ROUTER_INSTANCE *inst,
|
||||
bool succp;
|
||||
|
||||
myrses = *rses;
|
||||
ss_dassert(SPINLOCK_IS_LOCKED(&myrses->rses_lock));
|
||||
|
||||
ses = backend_dcb->session;
|
||||
CHK_SESSION(ses);
|
||||
|
@ -307,7 +307,6 @@ struct router_client_session
|
||||
#if defined(SS_DEBUG)
|
||||
skygw_chk_t rses_chk_top;
|
||||
#endif
|
||||
SPINLOCK rses_lock; /*< protects rses_deleted */
|
||||
bool rses_closed; /*< true when closeSession is called */
|
||||
rses_property_t* rses_properties[RSES_PROP_TYPE_COUNT]; /*< Properties listed by their type */
|
||||
backend_ref_t* rses_master_ref;
|
||||
@ -349,7 +348,6 @@ typedef struct
|
||||
typedef struct router_instance
|
||||
{
|
||||
SERVICE* service; /*< Pointer to service */
|
||||
SPINLOCK lock; /*< Lock for the instance data */
|
||||
rwsplit_config_t rwsplit_config; /*< expanded config info from SERVICE */
|
||||
int rwsplit_version; /*< version number for router's config */
|
||||
ROUTER_STATS stats; /*< Statistics for this router */
|
||||
|
@ -56,7 +56,6 @@ bool handle_target_is_all(route_target_t route_target,
|
||||
GWBUF *querybuf, int packet_type, qc_query_type_t qtype);
|
||||
int determine_packet_type(GWBUF *querybuf, bool *non_empty_packet);
|
||||
void log_transaction_status(ROUTER_CLIENT_SES *rses, GWBUF *querybuf, qc_query_type_t qtype);
|
||||
void session_lock_failure_handling(GWBUF *querybuf, int packet_type, qc_query_type_t qtype);
|
||||
bool is_packet_a_one_way_message(int packet_type);
|
||||
sescmd_cursor_t *backend_ref_get_sescmd_cursor(backend_ref_t *bref);
|
||||
bool is_packet_a_query(int packet_type);
|
||||
@ -65,8 +64,6 @@ bool send_readonly_error(DCB *dcb);
|
||||
/*
|
||||
* The following are implemented in readwritesplit.c
|
||||
*/
|
||||
bool rses_begin_locked_router_action(ROUTER_CLIENT_SES *rses);
|
||||
void rses_end_locked_router_action(ROUTER_CLIENT_SES *rses);
|
||||
void bref_clear_state(backend_ref_t *bref, bref_state_t state);
|
||||
void bref_set_state(backend_ref_t *bref, bref_state_t state);
|
||||
int router_handle_state_switch(DCB *dcb, DCB_REASON reason, void *data);
|
||||
|
@ -287,32 +287,6 @@ handle_target_is_all(route_target_t route_target,
|
||||
return result;
|
||||
}
|
||||
|
||||
/* This is MySQL specific */
|
||||
/**
|
||||
* @brief Write an error message to the log indicating failure
|
||||
*
|
||||
* Used when an attempt to lock the router session fails.
|
||||
*
|
||||
* @param querybuf Query buffer containing packet
|
||||
* @param packet_type Integer (enum) indicating type of packet
|
||||
* @param qtype Query type
|
||||
*/
|
||||
void
|
||||
session_lock_failure_handling(GWBUF *querybuf, int packet_type, qc_query_type_t qtype)
|
||||
{
|
||||
if (packet_type != MYSQL_COM_QUIT)
|
||||
{
|
||||
/* NOTE: modutil_get_query is MySQL specific */
|
||||
char *query_str = modutil_get_query(querybuf);
|
||||
|
||||
MXS_ERROR("Can't route %s:%s:\"%s\" to "
|
||||
"backend server. Router is closed.",
|
||||
STRPACKETTYPE(packet_type), STRQTYPE(qtype),
|
||||
(query_str == NULL ? "(empty)" : query_str));
|
||||
MXS_FREE(query_str);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Probably MySQL specific because of modutil function
|
||||
*/
|
||||
|
@ -70,16 +70,10 @@ bool route_single_stmt(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
|
||||
/* packet_type is a problem as it is MySQL specific */
|
||||
packet_type = determine_packet_type(querybuf, &non_empty_packet);
|
||||
qtype = determine_query_type(querybuf, packet_type, non_empty_packet);
|
||||
|
||||
if (non_empty_packet)
|
||||
{
|
||||
/** This might not be absolutely necessary as some parts of the code
|
||||
* can only be executed by one thread at a time. */
|
||||
if (!rses_begin_locked_router_action(rses))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
handle_multi_temp_and_load(rses, querybuf, packet_type, (int *)&qtype);
|
||||
rses_end_locked_router_action(rses);
|
||||
|
||||
if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_INFO))
|
||||
{
|
||||
@ -116,7 +110,7 @@ bool route_single_stmt(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
|
||||
{
|
||||
succp = handle_target_is_all(route_target, inst, rses, querybuf, packet_type, qtype);
|
||||
}
|
||||
else if (rses_begin_locked_router_action(rses))
|
||||
else
|
||||
{
|
||||
/* Now we have a lock on the router session */
|
||||
bool store_stmt = false;
|
||||
@ -152,12 +146,6 @@ bool route_single_stmt(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
|
||||
ss_dassert(!store_stmt || TARGET_IS_SLAVE(route_target));
|
||||
handle_got_target(inst, rses, querybuf, target_dcb, store_stmt);
|
||||
}
|
||||
rses_end_locked_router_action(rses);
|
||||
}
|
||||
else
|
||||
{
|
||||
session_lock_failure_handling(querybuf, packet_type, qtype);
|
||||
succp = false;
|
||||
}
|
||||
|
||||
return succp;
|
||||
@ -214,12 +202,6 @@ bool route_session_write(ROUTER_CLIENT_SES *router_cli_ses,
|
||||
{
|
||||
int rc;
|
||||
|
||||
/** Lock router session */
|
||||
if (!rses_begin_locked_router_action(router_cli_ses))
|
||||
{
|
||||
goto return_succp;
|
||||
}
|
||||
|
||||
for (i = 0; i < router_cli_ses->rses_nbackends; i++)
|
||||
{
|
||||
DCB *dcb = backend_ref[i].bref_dcb;
|
||||
@ -244,15 +226,9 @@ bool route_session_write(ROUTER_CLIENT_SES *router_cli_ses,
|
||||
}
|
||||
}
|
||||
}
|
||||
rses_end_locked_router_action(router_cli_ses);
|
||||
gwbuf_free(querybuf);
|
||||
goto return_succp;
|
||||
}
|
||||
/** Lock router session */
|
||||
if (!rses_begin_locked_router_action(router_cli_ses))
|
||||
{
|
||||
goto return_succp;
|
||||
}
|
||||
|
||||
if (router_cli_ses->rses_nbackends <= 0)
|
||||
{
|
||||
@ -318,7 +294,6 @@ bool route_session_write(ROUTER_CLIENT_SES *router_cli_ses,
|
||||
if ((prop = rses_property_init(RSES_PROP_TYPE_SESCMD)) == NULL)
|
||||
{
|
||||
MXS_ERROR("Router session property initialization failed");
|
||||
rses_end_locked_router_action(router_cli_ses);
|
||||
return false;
|
||||
}
|
||||
|
||||
@ -328,7 +303,6 @@ bool route_session_write(ROUTER_CLIENT_SES *router_cli_ses,
|
||||
if (rses_property_add(router_cli_ses, prop) != 0)
|
||||
{
|
||||
MXS_ERROR("Session property addition failed.");
|
||||
rses_end_locked_router_action(router_cli_ses);
|
||||
return false;
|
||||
}
|
||||
|
||||
@ -387,9 +361,6 @@ bool route_session_write(ROUTER_CLIENT_SES *router_cli_ses,
|
||||
|
||||
atomic_add(&router_cli_ses->rses_nsescmd, 1);
|
||||
|
||||
/** Unlock router session */
|
||||
rses_end_locked_router_action(router_cli_ses);
|
||||
|
||||
return_succp:
|
||||
/**
|
||||
* Routing must succeed to all backends that are used.
|
||||
@ -1364,7 +1335,6 @@ int rses_property_add(ROUTER_CLIENT_SES *rses, rses_property_t *prop)
|
||||
|
||||
CHK_CLIENT_RSES(rses);
|
||||
CHK_RSES_PROP(prop);
|
||||
ss_dassert(SPINLOCK_IS_LOCKED(&rses->rses_lock));
|
||||
|
||||
prop->rses_prop_rsession = rses;
|
||||
p = rses->rses_properties[prop->rses_prop_type];
|
||||
|
@ -60,8 +60,6 @@ mysql_sescmd_t *rses_property_get_sescmd(rses_property_t *prop)
|
||||
}
|
||||
|
||||
CHK_RSES_PROP(prop);
|
||||
ss_dassert(prop->rses_prop_rsession == NULL ||
|
||||
SPINLOCK_IS_LOCKED(&prop->rses_prop_rsession->rses_lock));
|
||||
|
||||
sescmd = &prop->rses_prop_data.sescmd;
|
||||
|
||||
@ -134,14 +132,9 @@ GWBUF *sescmd_cursor_process_replies(GWBUF *replybuf,
|
||||
backend_ref_t *bref,
|
||||
bool *reconnect)
|
||||
{
|
||||
mysql_sescmd_t *scmd;
|
||||
sescmd_cursor_t *scur;
|
||||
ROUTER_CLIENT_SES *ses;
|
||||
|
||||
scur = &bref->bref_sescmd_cur;
|
||||
ss_dassert(SPINLOCK_IS_LOCKED(&(scur->scmd_cur_rses->rses_lock)));
|
||||
scmd = sescmd_cursor_get_command(scur);
|
||||
ses = (*scur->scmd_cur_ptr_property)->rses_prop_rsession;
|
||||
sescmd_cursor_t *scur = &bref->bref_sescmd_cur;
|
||||
mysql_sescmd_t *scmd = sescmd_cursor_get_command(scur);
|
||||
ROUTER_CLIENT_SES *ses = (*scur->scmd_cur_ptr_property)->rses_prop_rsession;
|
||||
CHK_GWBUF(replybuf);
|
||||
|
||||
/**
|
||||
@ -273,7 +266,6 @@ mysql_sescmd_t *sescmd_cursor_get_command(sescmd_cursor_t *scur)
|
||||
{
|
||||
mysql_sescmd_t *scmd;
|
||||
|
||||
ss_dassert(SPINLOCK_IS_LOCKED(&(scur->scmd_cur_rses->rses_lock)));
|
||||
scur->scmd_cur_cmd = rses_property_get_sescmd(*scur->scmd_cur_ptr_property);
|
||||
|
||||
CHK_MYSQL_SESCMD(scur->scmd_cur_cmd);
|
||||
@ -293,7 +285,6 @@ bool sescmd_cursor_is_active(sescmd_cursor_t *sescmd_cursor)
|
||||
MXS_ERROR("[%s] Error: NULL parameter.", __FUNCTION__);
|
||||
return false;
|
||||
}
|
||||
ss_dassert(SPINLOCK_IS_LOCKED(&sescmd_cursor->scmd_cur_rses->rses_lock));
|
||||
|
||||
succp = sescmd_cursor->scmd_cur_active;
|
||||
return succp;
|
||||
@ -303,7 +294,6 @@ bool sescmd_cursor_is_active(sescmd_cursor_t *sescmd_cursor)
|
||||
void sescmd_cursor_set_active(sescmd_cursor_t *sescmd_cursor,
|
||||
bool value)
|
||||
{
|
||||
ss_dassert(SPINLOCK_IS_LOCKED(&sescmd_cursor->scmd_cur_rses->rses_lock));
|
||||
/** avoid calling unnecessarily */
|
||||
ss_dassert(sescmd_cursor->scmd_cur_active != value);
|
||||
sescmd_cursor->scmd_cur_active = value;
|
||||
@ -420,8 +410,6 @@ static bool sescmd_cursor_next(sescmd_cursor_t *scur)
|
||||
|
||||
ss_dassert(scur != NULL);
|
||||
ss_dassert(*(scur->scmd_cur_ptr_property) != NULL);
|
||||
ss_dassert(SPINLOCK_IS_LOCKED(
|
||||
&(*(scur->scmd_cur_ptr_property))->rses_prop_rsession->rses_lock));
|
||||
|
||||
/** Illegal situation */
|
||||
if (scur == NULL || *scur->scmd_cur_ptr_property == NULL ||
|
||||
|
@ -58,23 +58,23 @@
|
||||
*/
|
||||
|
||||
static MXS_ROUTER* createInstance(SERVICE *service, char **options);
|
||||
static void* newSession(MXS_ROUTER *instance, MXS_SESSION *session);
|
||||
static void closeSession(MXS_ROUTER *instance, void *session);
|
||||
static void freeSession(MXS_ROUTER *instance, void *session);
|
||||
static int routeQuery(MXS_ROUTER *instance, void *session, GWBUF *queue);
|
||||
static void diagnostic(MXS_ROUTER *instance, DCB *dcb);
|
||||
static MXS_ROUTER_SESSION* newSession(MXS_ROUTER *instance, MXS_SESSION *session);
|
||||
static void closeSession(MXS_ROUTER *instance, MXS_ROUTER_SESSION *session);
|
||||
static void freeSession(MXS_ROUTER *instance, MXS_ROUTER_SESSION *session);
|
||||
static int routeQuery(MXS_ROUTER *instance, MXS_ROUTER_SESSION *session, GWBUF *queue);
|
||||
static void diagnostic(MXS_ROUTER *instance, DCB *dcb);
|
||||
|
||||
static void clientReply(MXS_ROUTER* instance,
|
||||
void* router_session,
|
||||
GWBUF* queue,
|
||||
DCB* backend_dcb);
|
||||
MXS_ROUTER_SESSION* router_session,
|
||||
GWBUF* queue,
|
||||
DCB* backend_dcb);
|
||||
|
||||
static void handleError(MXS_ROUTER* instance,
|
||||
void* router_session,
|
||||
GWBUF* errmsgbuf,
|
||||
DCB* backend_dcb,
|
||||
static void handleError(MXS_ROUTER* instance,
|
||||
MXS_ROUTER_SESSION* router_session,
|
||||
GWBUF* errmsgbuf,
|
||||
DCB* backend_dcb,
|
||||
mxs_error_action_t action,
|
||||
bool* succp);
|
||||
bool* succp);
|
||||
static backend_ref_t* get_bref_from_dcb(ROUTER_CLIENT_SES* rses, DCB* dcb);
|
||||
|
||||
static route_target_t get_shard_route_target(qc_query_type_t qtype,
|
||||
@ -847,7 +847,7 @@ enum shard_map_state shard_map_update_state(shard_map_t *self, ROUTER_INSTANCE*
|
||||
* @param session The session itself
|
||||
* @return Session specific data for this session
|
||||
*/
|
||||
static void* newSession(MXS_ROUTER* router_inst, MXS_SESSION* session)
|
||||
static MXS_ROUTER_SESSION* newSession(MXS_ROUTER* router_inst, MXS_SESSION* session)
|
||||
{
|
||||
backend_ref_t* backend_ref; /*< array of backend references (DCB, BACKEND, cursor) */
|
||||
ROUTER_CLIENT_SES* client_rses = NULL;
|
||||
@ -1040,7 +1040,7 @@ static void* newSession(MXS_ROUTER* router_inst, MXS_SESSION* session)
|
||||
* @param instance The router instance data
|
||||
* @param session The session being closed
|
||||
*/
|
||||
static void closeSession(MXS_ROUTER* instance, void* router_session)
|
||||
static void closeSession(MXS_ROUTER* instance, MXS_ROUTER_SESSION* router_session)
|
||||
{
|
||||
ROUTER_CLIENT_SES* router_cli_ses;
|
||||
ROUTER_INSTANCE* inst;
|
||||
@ -1135,7 +1135,7 @@ static void closeSession(MXS_ROUTER* instance, void* router_session)
|
||||
}
|
||||
}
|
||||
|
||||
static void freeSession(MXS_ROUTER* router_instance, void* router_client_session)
|
||||
static void freeSession(MXS_ROUTER* router_instance, MXS_ROUTER_SESSION* router_client_session)
|
||||
{
|
||||
ROUTER_CLIENT_SES* router_cli_ses = (ROUTER_CLIENT_SES *)router_client_session;
|
||||
|
||||
@ -1612,7 +1612,7 @@ bool send_database_list(ROUTER_INSTANCE* router, ROUTER_CLIENT_SES* client)
|
||||
*
|
||||
*/
|
||||
static int routeQuery(MXS_ROUTER* instance,
|
||||
void* router_session,
|
||||
MXS_ROUTER_SESSION* router_session,
|
||||
GWBUF* qbuf)
|
||||
{
|
||||
qc_query_type_t qtype = QUERY_TYPE_UNKNOWN;
|
||||
@ -2238,7 +2238,7 @@ static void diagnostic(MXS_ROUTER *instance, DCB *dcb)
|
||||
* @param queue The GWBUF with reply data
|
||||
*/
|
||||
static void clientReply(MXS_ROUTER* instance,
|
||||
void* router_session,
|
||||
MXS_ROUTER_SESSION* router_session,
|
||||
GWBUF* buffer,
|
||||
DCB* backend_dcb)
|
||||
{
|
||||
@ -3550,12 +3550,12 @@ return_succp:
|
||||
* Even if succp == true connecting to new slave may have failed. succp is to
|
||||
* tell whether router has enough master/slave connections to continue work.
|
||||
*/
|
||||
static void handleError(MXS_ROUTER* instance,
|
||||
void* router_session,
|
||||
GWBUF* errmsgbuf,
|
||||
DCB* problem_dcb,
|
||||
static void handleError(MXS_ROUTER* instance,
|
||||
MXS_ROUTER_SESSION* router_session,
|
||||
GWBUF* errmsgbuf,
|
||||
DCB* problem_dcb,
|
||||
mxs_error_action_t action,
|
||||
bool* succp)
|
||||
bool* succp)
|
||||
{
|
||||
MXS_SESSION* session;
|
||||
ROUTER_INSTANCE* inst = (ROUTER_INSTANCE *)instance;
|
||||
|
@ -1,4 +0,0 @@
|
||||
add_library(testroute SHARED testroute.c)
|
||||
target_link_libraries(testroute maxscale-common)
|
||||
set_target_properties(testroute PROPERTIES VERSION "1.0.0")
|
||||
install_module(testroute core)
|
@ -1,167 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) 2016 MariaDB Corporation Ab
|
||||
*
|
||||
* Use of this software is governed by the Business Source License included
|
||||
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
|
||||
*
|
||||
* Change Date: 2019-07-01
|
||||
*
|
||||
* On the date above, in accordance with the Business Source License, use
|
||||
* of this software will be governed by version 2 or later of the General
|
||||
* Public License.
|
||||
*/
|
||||
#include <stdio.h>
|
||||
#include <maxscale/alloc.h>
|
||||
#include <maxscale/router.h>
|
||||
#include <maxscale/modinfo.h>
|
||||
|
||||
static MXS_ROUTER *createInstance(SERVICE *service, char **options);
|
||||
static void *newSession(MXS_ROUTER *instance, MXS_SESSION *session);
|
||||
static void closeSession(MXS_ROUTER *instance, void *session);
|
||||
static void freeSession(MXS_ROUTER *instance, void *session);
|
||||
static int routeQuery(MXS_ROUTER *instance, void *session, GWBUF *queue);
|
||||
static void clientReply(MXS_ROUTER *instance, void *session, GWBUF *queue, DCB*);
|
||||
static void diagnostic(MXS_ROUTER *instance, DCB *dcb);
|
||||
static uint64_t getCapabilities(MXS_ROUTER* instance);
|
||||
static void handleError(MXS_ROUTER *instance,
|
||||
void *router_session,
|
||||
GWBUF *errbuf,
|
||||
DCB *backend_dcb,
|
||||
mxs_error_action_t action,
|
||||
bool *succp);
|
||||
|
||||
typedef struct
|
||||
{
|
||||
} TESTROUTER;
|
||||
|
||||
typedef struct
|
||||
{
|
||||
} TESTSESSION;
|
||||
|
||||
/**
|
||||
* The module entry point routine. It is this routine that
|
||||
* must populate the structure that is referred to as the
|
||||
* "module object", this is a structure with the set of
|
||||
* external entry points for this module.
|
||||
*
|
||||
* @return The module object
|
||||
*/
|
||||
MXS_MODULE* MXS_CREATE_MODULE()
|
||||
{
|
||||
static MXS_ROUTER_OBJECT MyObject =
|
||||
{
|
||||
createInstance,
|
||||
newSession,
|
||||
closeSession,
|
||||
freeSession,
|
||||
routeQuery,
|
||||
diagnostic,
|
||||
clientReply,
|
||||
handleError,
|
||||
getCapabilities,
|
||||
NULL
|
||||
};
|
||||
|
||||
static MXS_MODULE info =
|
||||
{
|
||||
MXS_MODULE_API_ROUTER,
|
||||
MXS_MODULE_IN_DEVELOPMENT,
|
||||
MXS_ROUTER_VERSION,
|
||||
"A test router - not for use in real systems",
|
||||
"V1.0.0",
|
||||
&MyObject,
|
||||
NULL, /* Process init. */
|
||||
NULL, /* Process finish. */
|
||||
NULL, /* Thread init. */
|
||||
NULL, /* Thread finish. */
|
||||
{
|
||||
{MXS_END_MODULE_PARAMS}
|
||||
}
|
||||
};
|
||||
|
||||
return &info;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create an instance of the router for a particular service
|
||||
* within the gateway.
|
||||
*
|
||||
* @param service The service this router is being create for
|
||||
* @param options The options for this query router
|
||||
*
|
||||
* @return The instance data for this new instance
|
||||
*/
|
||||
static MXS_ROUTER *
|
||||
createInstance(SERVICE *service, char **options)
|
||||
{
|
||||
return (MXS_ROUTER*)MXS_MALLOC(sizeof(TESTROUTER));
|
||||
}
|
||||
|
||||
/**
|
||||
* Associate a new session with this instance of the router.
|
||||
*
|
||||
* @param instance The router instance data
|
||||
* @param session The session itself
|
||||
* @return Session specific data for this session
|
||||
*/
|
||||
static void *
|
||||
newSession(MXS_ROUTER *instance, MXS_SESSION *session)
|
||||
{
|
||||
return (MXS_SESSION*)MXS_MALLOC(sizeof(TESTSESSION));
|
||||
}
|
||||
|
||||
/**
|
||||
* Close a session with the router, this is the mechanism
|
||||
* by which a router may cleanup data structure etc.
|
||||
*
|
||||
* @param instance The router instance data
|
||||
* @param session The session being closed
|
||||
*/
|
||||
static void
|
||||
closeSession(MXS_ROUTER *instance, void *session)
|
||||
{
|
||||
}
|
||||
|
||||
static void freeSession(
|
||||
MXS_ROUTER* router_instance,
|
||||
void* router_client_session)
|
||||
{
|
||||
MXS_FREE(router_client_session);
|
||||
}
|
||||
|
||||
static int
|
||||
routeQuery(MXS_ROUTER *instance, void *session, GWBUF *queue)
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
void clientReply(MXS_ROUTER* instance, void* session, GWBUF* queue, DCB* dcb)
|
||||
{
|
||||
}
|
||||
|
||||
/**
|
||||
* Diagnostics routine
|
||||
*
|
||||
* @param instance The router instance
|
||||
* @param dcb The DCB for diagnostic output
|
||||
*/
|
||||
static void
|
||||
diagnostic(MXS_ROUTER *instance, DCB *dcb)
|
||||
{
|
||||
}
|
||||
|
||||
static uint64_t getCapabilities(MXS_ROUTER* instance)
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
static void handleError(
|
||||
MXS_ROUTER *instance,
|
||||
void *router_session,
|
||||
GWBUF *errbuf,
|
||||
DCB *backend_dcb,
|
||||
mxs_error_action_t action,
|
||||
bool *succp)
|
||||
{
|
||||
}
|
Reference in New Issue
Block a user