diff --git a/server/core/buffer.c b/server/core/buffer.c index e3c649f69..57228b253 100644 --- a/server/core/buffer.c +++ b/server/core/buffer.c @@ -81,6 +81,7 @@ SHARED_BUF *sbuf; sbuf->refcount = 1; rval->sbuf = sbuf; rval->next = NULL; + rval->gwbuf_type = GWBUF_TYPE_UNDEFINED; rval->command = 0; CHK_GWBUF(rval); return rval; @@ -127,6 +128,7 @@ GWBUF *rval; rval->sbuf = buf->sbuf; rval->start = buf->start; rval->end = buf->end; + rval->gwbuf_type = buf->gwbuf_type; rval->next = NULL; // rval->command = buf->command; CHK_GWBUF(rval); @@ -152,12 +154,67 @@ GWBUF *gwbuf_clone_portion( clonebuf->sbuf = buf->sbuf; clonebuf->start = (void *)((char*)buf->start)+start_offset; clonebuf->end = (void *)((char *)clonebuf->start)+length; + clonebuf->gwbuf_type = buf->gwbuf_type; /*< clone the type for now */ clonebuf->next = NULL; CHK_GWBUF(clonebuf); return clonebuf; } +/** + * Returns pointer to GWBUF of a requested type. + * As of 10.3.14 only MySQL to plain text conversion is supported. + * Return NULL if conversion between types is not supported or due lacking + * type information. + */ +GWBUF *gwbuf_clone_transform( + GWBUF * head, + gwbuf_type_t targettype) +{ + gwbuf_type_t src_type; + GWBUF* clonebuf; + + CHK_GWBUF(head); + src_type = head->gwbuf_type; + + if (targettype == GWBUF_TYPE_UNDEFINED || + src_type == GWBUF_TYPE_UNDEFINED || + src_type == GWBUF_TYPE_PLAINSQL || + targettype == src_type) + { + clonebuf = NULL; + goto return_clonebuf; + } + + switch (src_type) + { + case GWBUF_TYPE_MYSQL: + if (targettype == GWBUF_TYPE_PLAINSQL) + { + /** Crete reference to string part of buffer */ + clonebuf = gwbuf_clone_portion( + head, + 5, + GWBUF_LENGTH(head)-5); + ss_dassert(clonebuf != NULL); + /** Overwrite the type with new format */ + clonebuf->gwbuf_type = targettype; + } + else + { + clonebuf = NULL; + } + break; + + default: + clonebuf = NULL; + break; + } /*< switch (src_type) */ + +return_clonebuf: + return clonebuf; +} + /** * Append a buffer onto a linked list of buffer structures. @@ -234,3 +291,28 @@ int rval = 0; } return rval; } + +bool gwbuf_set_type( + GWBUF* buf, + gwbuf_type_t type) +{ + bool succp; + CHK_GWBUF(buf); + + switch (type) { + case GWBUF_TYPE_MYSQL: + case GWBUF_TYPE_PLAINSQL: + case GWBUF_TYPE_UNDEFINED: + buf->gwbuf_type = type; + succp = true; + break; + default: + succp = false; + break; + } + ss_dassert(succp); + return succp; +} + + + diff --git a/server/core/dcb.c b/server/core/dcb.c index fcf2e0cf2..db9e96b60 100644 --- a/server/core/dcb.c +++ b/server/core/dcb.c @@ -1357,19 +1357,19 @@ int gw_write( w = write(fd, buf, nbytes); #endif /* SS_DEBUG && SS_TEST */ -#if defined(SS_DEBUG) +#if defined(SS_DEBUG_MYSQL) { - size_t len; - unsigned char* packet = (unsigned char *)buf; - char* str; + size_t len; + uint8_t* packet = (uint8_t *)buf; + char* str; /** Print only MySQL packets */ if (w > 5) { str = (char *)&packet[5]; len = packet[0]; - len += 255*packet[1]; - len += 255*255*packet[2]; + len += 256*packet[1]; + len += 256*256*packet[2]; if (strncmp(str, "insert", 6) == 0 || strncmp(str, "create", 6) == 0 || @@ -1385,8 +1385,8 @@ int gw_write( if (nbytes-5 > len) { size_t len2 = packet[4+len]; - len2 += 255*packet[4+len+1]; - len2 += 255*255*packet[4+len+2]; + len2 += 256*packet[4+len+1]; + len2 += 256*256*packet[4+len+2]; char* str2 = (char *)&packet[4+len+5]; snprintf(s, 5+len+len2, "long %s %s", (char *)str, (char *)str2); diff --git a/server/include/buffer.h b/server/include/buffer.h index de5a13148..53c81a4a6 100644 --- a/server/include/buffer.h +++ b/server/include/buffer.h @@ -41,9 +41,18 @@ * * @endverbatim */ +#include + + +typedef enum +{ + GWBUF_TYPE_UNDEFINED = 0x0, + GWBUF_TYPE_PLAINSQL = 0x1, + GWBUF_TYPE_MYSQL = 0x2 +} gwbuf_type_t; /** - * A structure to encapsualte the data in a form that the data itself can be + * A structure to encapsulate the data in a form that the data itself can be * shared between multiple GWBUF's without the need to make multiple copies * but still maintain separate data pointers. */ @@ -64,8 +73,9 @@ typedef struct gwbuf { struct gwbuf *next; /*< Next buffer in a linked chain of buffers */ void *start; /*< Start of the valid data */ void *end; /*< First byte after the valid data */ - SHARED_BUF *sbuf; /*< The shared buffer with the real data */ + SHARED_BUF *sbuf; /*< The shared buffer with the real data */ int command;/*< The command type for the queue */ + gwbuf_type_t gwbuf_type; /*< buffer's data type information */ } GWBUF; /*< @@ -83,6 +93,7 @@ typedef struct gwbuf { /*< Consume a number of bytes in the buffer */ #define GWBUF_CONSUME(b, bytes) (b)->start += bytes +#define GWBUF_TYPE(b) (b)->gwbuf_type /*< * Function prototypes for the API to maniplate the buffers */ @@ -93,5 +104,6 @@ extern GWBUF *gwbuf_append(GWBUF *head, GWBUF *tail); extern GWBUF *gwbuf_consume(GWBUF *head, unsigned int length); extern unsigned int gwbuf_length(GWBUF *head); extern GWBUF *gwbuf_clone_portion(GWBUF *head, size_t offset, size_t len); - +extern GWBUF *gwbuf_clone_transform(GWBUF *head, gwbuf_type_t type); +extern bool gwbuf_set_type(GWBUF *head, gwbuf_type_t type); #endif diff --git a/server/include/dcb.h b/server/include/dcb.h index cacf6527e..29ab643be 100644 --- a/server/include/dcb.h +++ b/server/include/dcb.h @@ -87,7 +87,6 @@ typedef struct gw_protocol { int (*listen)(struct dcb *, char *); int (*auth)(struct dcb *, struct server *, struct session *, GWBUF *); int (*session)(struct dcb *, void *); - void* (*getstmt)(void* buf); } GWPROTOCOL; /** diff --git a/server/include/router.h b/server/include/router.h index 9c93f3271..004f91d51 100644 --- a/server/include/router.h +++ b/server/include/router.h @@ -35,6 +35,7 @@ #include #include #include +#include /** * The ROUTER handle points to module specific data, so the best we can do @@ -74,10 +75,13 @@ typedef struct router_object { void (*diagnostics)(ROUTER *instance, DCB *dcb); void (*clientReply)(ROUTER* instance, void* router_session, GWBUF* queue, DCB *backend_dcb); void (*errorReply)(ROUTER* instance, void* router_session, char* message, DCB *backend_dcb, int action); + uint8_t (*getCapabilities)(ROUTER *instance, void* router_session); } ROUTER_OBJECT; -/* Router commands */ -#define ROUTER_DEFAULT 0 /**< Standard routing */ -#define ROUTER_CHANGE_SESSION 1 /**< Route a change session */ +typedef enum router_capability_t { + RCAP_TYPE_UNDEFINED = 0, + RCAP_TYPE_STMT_INPUT = (1 << 0), + RCAP_TYPE_PACKET_INPUT = (1 << 1) +} router_capability_t; #endif diff --git a/server/modules/include/readconnection.h b/server/modules/include/readconnection.h index 70f6de9cd..c6f19995c 100644 --- a/server/modules/include/readconnection.h +++ b/server/modules/include/readconnection.h @@ -53,8 +53,8 @@ typedef struct router_client_session { bool rses_closed; /*< true when closeSession is called */ BACKEND *backend; /*< Backend used by the client session */ DCB *backend_dcb; /*< DCB Connection to the backend */ - struct router_client_session - *next; + struct router_client_session *next; + int rses_capabilities; /*< input type, for example */ #if defined(SS_DEBUG) skygw_chk_t rses_chk_tail; #endif diff --git a/server/modules/include/readwritesplit.h b/server/modules/include/readwritesplit.h index 95b414958..c5dcbdaaa 100644 --- a/server/modules/include/readwritesplit.h +++ b/server/modules/include/readwritesplit.h @@ -66,7 +66,6 @@ typedef struct mysql_sescmd_st { #if defined(SS_DEBUG) skygw_chk_t my_sescmd_chk_top; #endif -// ROUTER_CLIENT_SES* my_sescmd_rsession; /*< parent router session */ rses_property_t* my_sescmd_prop; /*< parent property */ GWBUF* my_sescmd_buf; /*< query buffer */ bool my_sescmd_is_replied; /*< is cmd replied to client */ @@ -84,7 +83,6 @@ struct rses_property_st { skygw_chk_t rses_prop_chk_top; #endif ROUTER_CLIENT_SES* rses_prop_rsession; /*< parent router session */ -// SPINLOCK rses_prop_lock; /*< protect property content */ int rses_prop_refcount; rses_property_type_t rses_prop_type; union rses_prop_data { @@ -121,6 +119,7 @@ struct router_client_session { DCB* rses_dcb[BE_COUNT]; /*< cursor is pointer and status variable to current session command */ sescmd_cursor_t rses_cursor[BE_COUNT]; + int rses_capabilities; /*< input type, for example */ struct router_client_session* next; #if defined(SS_DEBUG) skygw_chk_t rses_chk_tail; diff --git a/server/modules/protocol/mysql_backend.c b/server/modules/protocol/mysql_backend.c index 06d6e3a85..03e433062 100644 --- a/server/modules/protocol/mysql_backend.c +++ b/server/modules/protocol/mysql_backend.c @@ -856,7 +856,7 @@ static int gw_change_user(DCB *backend, SERVER *server, SESSION *in_session, GWB backend_protocol = backend->protocol; client_protocol = in_session->client->protocol; - queue->command = ROUTER_CHANGE_SESSION; +// queue->command = ROUTER_CHANGE_SESSION; // now get the user, after 4 bytes header and 1 byte command client_auth_packet += 5; @@ -939,7 +939,7 @@ static int gw_session(DCB *backend_dcb, void *data) { GWBUF *queue = NULL; queue = (GWBUF *) data; - queue->command = ROUTER_CHANGE_SESSION; +// queue->command = ROUTER_CHANGE_SESSION; backend_dcb->func.write(backend_dcb, queue); return 1; diff --git a/server/modules/protocol/mysql_client.c b/server/modules/protocol/mysql_client.c index c2f50a76a..255ede201 100644 --- a/server/modules/protocol/mysql_client.c +++ b/server/modules/protocol/mysql_client.c @@ -48,11 +48,16 @@ static int gw_MySQLWrite_client(DCB *dcb, GWBUF *queue); static int gw_error_client_event(DCB *dcb); static int gw_client_close(DCB *dcb); static int gw_client_hangup_event(DCB *dcb); -static void* gw_MySQL_get_next_stmt(void* buffer); int mysql_send_ok(DCB *dcb, int packet_number, int in_affected_rows, const char* mysql_message); int MySQLSendHandshake(DCB* dcb); static int gw_mysql_do_authentication(DCB *dcb, GWBUF *queue); +static int route_by_statement( + ROUTER* router_instance, + ROUTER_OBJECT* router, + void* rsession, + GWBUF* read_buf); +static GWBUF* gw_MySQL_get_next_stmt(GWBUF** buffer); /* * The "module object" for the mysqld client protocol module. @@ -68,8 +73,7 @@ static GWPROTOCOL MyObject = { gw_client_close, /* Close */ gw_MySQLListener, /* Listen */ NULL, /* Authentication */ - NULL, /* Session */ - gw_MySQL_get_next_stmt /* get single stmt from read buf */ + NULL /* Session */ }; /** @@ -609,9 +613,11 @@ int gw_read_client_event(DCB* dcb) { */ { int len = -1; + uint8_t cap = 0; GWBUF *read_buffer = NULL; uint8_t *ptr_buff = NULL; int mysql_command = -1; + bool stmt_input; /*< router input type */ session = dcb->session; @@ -640,7 +646,7 @@ int gw_read_client_event(DCB* dcb) { /* get mysql commang at fifth byte */ if (ptr_buff) { mysql_command = ptr_buff[4]; - } + } /** * Without rsession there is no access to backend. * COM_QUIT : close client dcb @@ -672,6 +678,32 @@ int gw_read_client_event(DCB* dcb) { read_buffer = gwbuf_consume(read_buffer, len); goto return_rc; } + /** Ask what type of input the router expects */ + cap = router->getCapabilities(router_instance, rsession); + + if (cap == 0 || (cap == RCAP_TYPE_PACKET_INPUT)) + { + stmt_input = false; + } + else if (cap == RCAP_TYPE_STMT_INPUT) + { + stmt_input = true; + /** Mark buffer to as MySQL type */ + gwbuf_set_type(read_buffer, GWBUF_TYPE_MYSQL); + } + else + { + mysql_send_custom_error(dcb, + 1, + 0, + "Reading router capabilities " + "failed. Router session is " + "closed."); + rc = 1; + goto return_rc; + } + + /** Route COM_QUIT to backend */ if (mysql_command == '\x01') { router->routeQuery(router_instance, rsession, read_buffer); @@ -690,10 +722,25 @@ int gw_read_client_event(DCB* dcb) { } else { - /** Route other commands to backend */ - rc = router->routeQuery(router_instance, + if (stmt_input) + { + /** + * Feed each statement completely and separately + * to router. + */ + rc = route_by_statement(router_instance, + router, + rsession, + read_buffer); + } + else + { + /** Feed whole packet to router */ + rc = router->routeQuery(router_instance, rsession, read_buffer); + } + /** succeed */ if (rc == 1) { rc = 0; /**< here '0' means success */ @@ -1213,37 +1260,143 @@ return_rc: * so that it only cover the remaining buffer. * */ -static void* gw_MySQL_get_next_stmt( - void* buffer) +static GWBUF* gw_MySQL_get_next_stmt( + GWBUF** p_readbuf) { - GWBUF* readbuf = (GWBUF *)buffer; GWBUF* stmtbuf; - unsigned char* packet; - size_t len; + size_t buflen; + size_t strlen; + uint8_t* packet; - CHK_GWBUF(readbuf); - - if (GWBUF_EMPTY(readbuf)) + if (*p_readbuf == NULL) { stmtbuf = NULL; goto return_stmtbuf; - } - packet = GWBUF_DATA(readbuf); - len = packet[0]; - len += 255*packet[1]; - len += 255*255*packet[2]; + } + CHK_GWBUF(*p_readbuf); - /** vraa :Multi-packet stmt is not supported as of 7.3.14 */ - if (len+4 > GWBUF_LENGTH(readbuf)) + if (GWBUF_EMPTY(*p_readbuf)) { stmtbuf = NULL; goto return_stmtbuf; } - stmtbuf = gwbuf_clone_portion(readbuf, 0, 4+len); - gwbuf_consume(readbuf, 4+len); + buflen = GWBUF_LENGTH((*p_readbuf)); + packet = GWBUF_DATA((*p_readbuf)); + strlen = MYSQL_GET_PACKET_LEN(packet); + + /** vraa :Multi-packet stmt is not supported as of 7.3.14 */ + if (strlen-1 > buflen-5) + { + stmtbuf = NULL; + goto return_stmtbuf; + } + stmtbuf = gwbuf_clone_portion(*p_readbuf, 0, strlen+4); + *p_readbuf = gwbuf_consume(*p_readbuf, strlen+4); return_stmtbuf: - return (void *)stmtbuf; + return stmtbuf; +} + +/** + * Detect if buffer includes partial mysql packet or multiple packets. + * Store partial packet to pendingqueue. Send complete packets one by one + * to router. + */ +static int route_by_statement( + ROUTER* router_instance, + ROUTER_OBJECT* router, + void* rsession, + GWBUF* readbuf) +{ + int rc = -1; + DCB* master_dcb; + GWBUF* stmtbuf; + uint8_t* payload; + static size_t len; + +#if defined(SS_DEBUG) + uint8_t router_capabilities; + + router_capabilities = router->getCapabilities(router_instance, rsession); + + ss_dassert(router_capabilities == RCAP_TYPE_STMT_INPUT); +#endif + do + { + stmtbuf = gw_MySQL_get_next_stmt(&readbuf); + ss_dassert(stmtbuf != NULL); + CHK_GWBUF(stmtbuf); + + payload = (uint8_t *)GWBUF_DATA(stmtbuf); + len += MYSQL_GET_PACKET_LEN(payload); + /** + * If message is longer than read data, suspend routing and + * add statement buffer to wait queue. + */ + rc = router->routeQuery(router_instance, rsession, stmtbuf); + len = 0; /*< if routed, reset the length indicator */ + } + while (readbuf != NULL); + + return rc; } +/** + * Create a character array including the query string. + * GWBUF given as input includes either one complete or partial query. + * Length of buffer is at most the query length+4 (length of packet header). + */ +#if defined(NOT_USED) +static char* gw_get_or_create_querystr ( + void* data, + bool* new_allocation) +{ + GWBUF* buf = (GWBUF *)data; + size_t buflen; /*< first gw buffer data length */ + size_t packetlen; /*< length of mysql packet */ + size_t querylen; /*< total buffer length- */ + size_t nbytes_copied; + char* startpos; /*< first byte of query in gw buffer */ + char* str; /*< resulting query string */ + + CHK_GWBUF(buf); + packetlen = MYSQL_GET_PACKET_LEN((uint8_t *)GWBUF_DATA(buf)); + str = (char *)malloc(packetlen); /*< leave space for terminating null */ + + if (str == NULL) + { + goto return_str; + } + *new_allocation = true; + /** + * First buffer includes 4 bytes header and a type indicator byte. + */ + buflen = GWBUF_LENGTH(buf); + querylen = packetlen-1; + ss_dassert(buflen<=querylen+5); /*< 5 == header+type indicator */ + startpos = (char *)GWBUF_DATA(buf)+5; + nbytes_copied = MIN(querylen, buflen-5); + memcpy(str, startpos, nbytes_copied); + memset(&str[querylen-1], 0, 1); + buf = gwbuf_consume(buf, querylen-1); + + /** + * In case of multi-packet statement whole buffer consists of query + * string. + */ + while (buf != NULL) + { + buflen = GWBUF_LENGTH(buf); + memcpy(str+nbytes_copied, GWBUF_DATA(buf), buflen); + nbytes_copied += buflen; + buf = gwbuf_consume(buf, buflen); + } + ss_dassert(str[querylen-1] == 0); + +return_str: + return str; +} +#endif + + diff --git a/server/modules/routing/debugcli.c b/server/modules/routing/debugcli.c index 9c21c66f2..66c02c98b 100644 --- a/server/modules/routing/debugcli.c +++ b/server/modules/routing/debugcli.c @@ -54,6 +54,7 @@ static void closeSession(ROUTER *instance, void *router_session); static void freeSession(ROUTER *instance, void *router_session); static int execute(ROUTER *instance, void *router_session, GWBUF *queue); static void diagnostics(ROUTER *instance, DCB *dcb); +static uint8_t getCapabilities (ROUTER* inst, void* router_session); /** The module object definition */ static ROUTER_OBJECT MyObject = { @@ -64,7 +65,8 @@ static ROUTER_OBJECT MyObject = { execute, diagnostics, NULL, - NULL + NULL, + getCapabilities }; extern int execute_cmd(CLI_SESSION *cli); @@ -273,3 +275,10 @@ diagnostics(ROUTER *instance, DCB *dcb) { return; /* Nothing to do currently */ } + +static uint8_t getCapabilities( + ROUTER* inst, + void* router_session) +{ + return 0; +} \ No newline at end of file diff --git a/server/modules/routing/readconnroute.c b/server/modules/routing/readconnroute.c index a17e833b8..9bfa669e9 100644 --- a/server/modules/routing/readconnroute.c +++ b/server/modules/routing/readconnroute.c @@ -106,6 +106,8 @@ static void errorReply( char *message, DCB *backend_dcb, int action); +static uint8_t getCapabilities (ROUTER* inst, void* router_session); + /** The module object definition */ static ROUTER_OBJECT MyObject = { @@ -116,7 +118,8 @@ static ROUTER_OBJECT MyObject = { routeQuery, diagnostics, clientReply, - errorReply + errorReply, + getCapabilities }; static bool rses_begin_locked_router_action( @@ -379,6 +382,8 @@ int i; return NULL; } + client_rses->rses_capabilities = RCAP_TYPE_PACKET_INPUT; + /* * We now have the server with the least connections. * Bump the connection count for this server @@ -668,7 +673,7 @@ static void errorReply( ROUTER *instance, void *router_session, - char *message, + char *message, DCB *backend_dcb, int action) { @@ -737,3 +742,11 @@ static void rses_end_locked_router_action( CHK_CLIENT_RSES(rses); spinlock_release(&rses->rses_lock); } + + +static uint8_t getCapabilities( + ROUTER* inst, + void* router_session) +{ + return 0; +} \ No newline at end of file diff --git a/server/modules/routing/readwritesplit/readwritesplit.c b/server/modules/routing/readwritesplit/readwritesplit.c index 8f38c79c3..a70b7c8d9 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.c +++ b/server/modules/routing/readwritesplit/readwritesplit.c @@ -19,6 +19,7 @@ #include #include #include +#include #include #include @@ -66,6 +67,8 @@ static void clientReply( void* router_session, GWBUF* queue, DCB* backend_dcb); +static uint8_t getCapabilities (ROUTER* inst, void* router_session); + static bool search_backend_servers( BACKEND** p_master, @@ -80,7 +83,8 @@ static ROUTER_OBJECT MyObject = { routeQuery, diagnostic, clientReply, - NULL + NULL, + getCapabilities }; static bool rses_begin_locked_router_action( ROUTER_CLIENT_SES* rses); @@ -416,6 +420,7 @@ static void* newSession( client_rses->rses_backend[BE_MASTER] = local_backend[BE_MASTER]; router->stats.n_sessions += 1; + client_rses->rses_capabilities = RCAP_TYPE_STMT_INPUT; /** * Version is bigger than zero once initialized. */ @@ -561,384 +566,363 @@ static int routeQuery( GWBUF* querybuf) { skygw_query_type_t qtype = QUERY_TYPE_UNKNOWN; - GWBUF* stmtbuf; + GWBUF* plainsqlbuf = NULL; char* querystr = NULL; char* startpos; - size_t len; unsigned char packet_type; - unsigned char* packet; + uint8_t* packet; int ret = 0; DCB* master_dcb = NULL; DCB* slave_dcb = NULL; -// GWBUF* bufcopy = NULL; ROUTER_INSTANCE* inst = (ROUTER_INSTANCE *)instance; ROUTER_CLIENT_SES* router_cli_ses = (ROUTER_CLIENT_SES *)router_session; bool rses_is_closed; rses_property_t* prop; + size_t len; CHK_CLIENT_RSES(router_cli_ses); - - inst->stats.n_queries++; - /** Lock router session */ - if (!rses_begin_locked_router_action(router_cli_ses)) + + + /** Dirty read for quick check if router is closed. */ + if (router_cli_ses->rses_closed) { + rses_is_closed = true; + } + else + { + /** + * Lock router client session for secure read of DCBs + */ + rses_is_closed = + !(rses_begin_locked_router_action(router_cli_ses)); + } + + if (!rses_is_closed) + { + master_dcb = router_cli_ses->rses_dcb[BE_MASTER]; + slave_dcb = router_cli_ses->rses_dcb[BE_SLAVE]; + /** unlock */ + rses_end_locked_router_action(router_cli_ses); + } + + packet = GWBUF_DATA(querybuf); + packet_type = packet[4]; + + if (rses_is_closed || (master_dcb == NULL && slave_dcb == NULL)) + { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error: Failed to route %s:%s:\"%s\" to " + "backend server. %s.", + STRPACKETTYPE(packet_type), + STRQTYPE(qtype), + (querystr == NULL ? "(empty)" : querystr), + (rses_is_closed ? "Router was closed" : + "Router has no backend servers where to " + "route to")))); goto return_ret; } - master_dcb = router_cli_ses->rses_dcb[BE_MASTER]; + inst->stats.n_queries++; + startpos = (char *)&packet[5]; - /** stmtbuf is clone of querybuf, and only covers one stmt */ - stmtbuf = (GWBUF *)master_dcb->session->client->func.getstmt((void *)querybuf); - /** Unlock router session */ - rses_end_locked_router_action(router_cli_ses); + switch(packet_type) { + case COM_QUIT: /**< 1 QUIT will close all sessions */ + case COM_INIT_DB: /**< 2 DDL must go to the master */ + case COM_REFRESH: /**< 7 - I guess this is session but not sure */ + case COM_DEBUG: /**< 0d all servers dump debug info to stdout */ + case COM_PING: /**< 0e all servers are pinged */ + case COM_CHANGE_USER: /**< 11 all servers change it accordingly */ + qtype = QUERY_TYPE_SESSION_WRITE; + break; + + case COM_CREATE_DB: /**< 5 DDL must go to the master */ + case COM_DROP_DB: /**< 6 DDL must go to the master */ + qtype = QUERY_TYPE_WRITE; + break; + + case COM_QUERY: + plainsqlbuf = gwbuf_clone_transform(querybuf, + GWBUF_TYPE_PLAINSQL); + len = GWBUF_LENGTH(plainsqlbuf); + /** unnecessary if buffer includes additional terminating null */ + querystr = (char *)malloc(len+1); + memcpy(querystr, startpos, len); + memset(&querystr[len], 0, 1); + // querystr = (char *)GWBUF_DATA(plainsqlbuf); + /* + querystr = master_dcb->func.getquerystr( + (void *) gwbuf_clone(querybuf), + &querystr_is_copy); + */ + qtype = skygw_query_classifier_get_type(querystr, 0); + break; + + case COM_SHUTDOWN: /**< 8 where should shutdown be routed ? */ + case COM_STATISTICS: /**< 9 ? */ + case COM_PROCESS_INFO: /**< 0a ? */ + case COM_CONNECT: /**< 0b ? */ + case COM_PROCESS_KILL: /**< 0c ? */ + case COM_TIME: /**< 0f should this be run in gateway ? */ + case COM_DELAYED_INSERT: /**< 10 ? */ + case COM_DAEMON: /**< 1d ? */ + default: + break; + } /**< switch by packet type */ - while (stmtbuf != NULL) - { - packet = GWBUF_DATA(stmtbuf); - packet_type = packet[4]; - startpos = (char *)&packet[5]; - len = packet[0]; - len += 255*packet[1]; - len += 255*255*packet[2]; - - switch(packet_type) { - case COM_QUIT: /**< 1 QUIT will close all sessions */ - case COM_INIT_DB: /**< 2 DDL must go to the master */ - case COM_REFRESH: /**< 7 - I guess this is session but not sure */ - case COM_DEBUG: /**< 0d all servers dump debug info to stdout */ - case COM_PING: /**< 0e all servers are pinged */ - case COM_CHANGE_USER: /**< 11 all servers change it accordingly */ - qtype = QUERY_TYPE_SESSION_WRITE; - break; - - case COM_CREATE_DB: /**< 5 DDL must go to the master */ - case COM_DROP_DB: /**< 6 DDL must go to the master */ - qtype = QUERY_TYPE_WRITE; - break; - - case COM_QUERY: - querystr = (char *)malloc(len); - memcpy(querystr, startpos, len-1); - memset(&querystr[len-1], 0, 1); - qtype = skygw_query_classifier_get_type(querystr, 0); - break; - - case COM_SHUTDOWN: /**< 8 where should shutdown be routed ? */ - case COM_STATISTICS: /**< 9 ? */ - case COM_PROCESS_INFO: /**< 0a ? */ - case COM_CONNECT: /**< 0b ? */ - case COM_PROCESS_KILL: /**< 0c ? */ - case COM_TIME: /**< 0f should this be run in gateway ? */ - case COM_DELAYED_INSERT: /**< 10 ? */ - case COM_DAEMON: /**< 1d ? */ - default: - break; - } /**< switch by packet type */ - - /** Dirty read for quick check if router is closed. */ - if (router_cli_ses->rses_closed) + LOGIF(LT, (skygw_log_write(LOGFILE_TRACE, + "String\t\"%s\"", + querystr == NULL ? "(empty)" : querystr))); + LOGIF(LT, (skygw_log_write(LOGFILE_TRACE, + "Packet type\t%s", + STRPACKETTYPE(packet_type)))); + + switch (qtype) { + case QUERY_TYPE_WRITE: + LOGIF(LT, (skygw_log_write( + LOGFILE_TRACE, + "%lu [routeQuery:rwsplit] Query type\t%s, " + "routing to Master.", + pthread_self(), + STRQTYPE(qtype)))); + + LOGIF(LT, tracelog_routed_query(router_cli_ses, + "routeQuery", + master_dcb, + gwbuf_clone(querybuf))); + + ret = master_dcb->func.write(master_dcb, querybuf); + atomic_add(&inst->stats.n_master, 1); + + goto return_ret; + break; + + case QUERY_TYPE_READ: + LOGIF(LT, (skygw_log_write( + LOGFILE_TRACE, + "%lu [routeQuery:rwsplit] Query type\t%s, " + "routing to Slave.", + pthread_self(), + STRQTYPE(qtype)))); + /** Lock router session */ + if (!rses_begin_locked_router_action(router_cli_ses)) { - rses_is_closed = true; + /** Log error to debug */ + goto return_ret; + } + /** + * If session command is being executed in slave + * route to master + */ + if (sescmd_cursor_is_active(rses_get_sescmd_cursor( + router_cli_ses, + BE_SLAVE))) + { + LOGIF(LT, tracelog_routed_query(router_cli_ses, + "routeQuery", + master_dcb, + gwbuf_clone(querybuf))); + + ret = master_dcb->func.write(master_dcb, querybuf); + atomic_add(&inst->stats.n_master, 1); } else { - /** - * Lock router client session for secure read of DCBs - */ - rses_is_closed = - !(rses_begin_locked_router_action(router_cli_ses)); - } - - if (!rses_is_closed) - { - master_dcb = router_cli_ses->rses_dcb[BE_MASTER]; - slave_dcb = router_cli_ses->rses_dcb[BE_SLAVE]; - /** unlock */ - rses_end_locked_router_action(router_cli_ses); - } - - if (rses_is_closed || (master_dcb == NULL && slave_dcb == NULL)) - { - LOGIF(LE, (skygw_log_write_flush( - LOGFILE_ERROR, - "Error: Failed to route %s:%s:\"%s\" to " - "backend server. %s.", - STRPACKETTYPE(packet_type), - STRQTYPE(qtype), - (querystr == NULL ? "(empty)" : querystr), - (rses_is_closed ? "Router was closed" : - "Router has no backend servers where to " - "route to")))); - goto return_ret; - } - - LOGIF(LT, (skygw_log_write(LOGFILE_TRACE, - "String\t\"%s\"", - querystr == NULL ? "(empty)" : querystr))); - LOGIF(LT, (skygw_log_write(LOGFILE_TRACE, - "Packet type\t%s", - STRPACKETTYPE(packet_type)))); - - switch (qtype) { - case QUERY_TYPE_WRITE: - LOGIF(LT, (skygw_log_write( - LOGFILE_TRACE, - "%lu [routeQuery:rwsplit] Query type\t%s, " - "routing to Master.", - pthread_self(), - STRQTYPE(qtype)))); - LOGIF(LT, tracelog_routed_query(router_cli_ses, "routeQuery", - master_dcb, - gwbuf_clone(stmtbuf))); + slave_dcb, + gwbuf_clone(querybuf))); - ret = master_dcb->func.write(master_dcb, stmtbuf); - atomic_add(&inst->stats.n_master, 1); + ret = slave_dcb->func.write(slave_dcb, querybuf); + atomic_add(&inst->stats.n_slave, 1); + } + rses_end_locked_router_action(router_cli_ses); + goto return_ret; + break; + + case QUERY_TYPE_SESSION_WRITE: + /** + * Execute in backends used by current router session. + * Save session variable commands to router session property + * struct. Thus, they + * can be replayed in backends which are started and joined later. + * + * Suppress OK packets sent to MaxScale by slaves. + * + * DOES THIS ALL APPLY TO COM_QUIT AS WELL?? + * + */ + + /** + * Update connections which are used in this session. + * + * For each connection updated, add a flag which indicates that + * OK Packet must arrive for this command before server + * in question is allowed to be used by router. That is, + * maintain a queue of pending OK packets and remove item + * from queue by FIFO. + * + * Return when the master responds OK Packet. Send that + * OK packet back to client. + * + * Suppress OK packets sent to MaxScale by slaves. + * + * Open questions: + * How to handle interleaving session write + * and queries? It would be simple if OK must be received + * from all/both servers before continuing query execution. + * How to maintain the order of operations? Execution queue + * would solve the problem. In the queue some things must be + * executed in serialized manner while some could be executed + * in parallel. Queries mostly. + * + * Instead of waiting for the OK packet from the master, the + * first OK packet could also be sent to client. TBD. + * vraa 9.12.13 + * + */ + LOGIF(LT, (skygw_log_write( + LOGFILE_TRACE, + "%lu [routeQuery:rwsplit] DCB M:%p s:%p, " + "Query type\t%s, " + "packet type %s, routing to all servers.", + pthread_self(), + master_dcb, + slave_dcb, + STRQTYPE(qtype), + STRPACKETTYPE(packet_type)))); + + switch(packet_type) { + case COM_CHANGE_USER: - goto return_ret; + LOGIF(LT, tracelog_routed_query( + router_cli_ses, + "routeQuery", + master_dcb, + gwbuf_clone(querybuf))); + + master_dcb->func.auth( + master_dcb, + NULL, + master_dcb->session, + gwbuf_clone(querybuf)); + + LOGIF(LT, tracelog_routed_query( + router_cli_ses, + "routeQuery", + slave_dcb, + gwbuf_clone(querybuf))); + + slave_dcb->func.auth( + slave_dcb, + NULL, + master_dcb->session, + querybuf); break; + + case COM_QUIT: + case COM_QUERY: + /** + * 1. Create new property of type RSES_PROP_TYPE_SESCMD. + * 2. Add property to the ROUTER_CLIENT_SES struct of + * this router session. + * 3. For each backend, and for each non-executed + * sescmd: + * call execution of current sescmd in + * all backends as long as both have executed + * them all. + * Execution call is dcb->func.session. + * All sescmds are executed when its return value is + * NULL, otherwise it is a pointer to next property. + */ + prop = rses_property_init(RSES_PROP_TYPE_SESCMD); + /** + * Additional reference is created to querybuf to + * prevent it from being released before properties + * are cleaned up as a part of router sessionclean-up. + */ + mysql_sescmd_init(prop, querybuf, router_cli_ses); - case QUERY_TYPE_READ: - LOGIF(LT, (skygw_log_write( - LOGFILE_TRACE, - "%lu [routeQuery:rwsplit] Query type\t%s, " - "routing to Slave.", - pthread_self(), - STRQTYPE(qtype)))); /** Lock router session */ if (!rses_begin_locked_router_action(router_cli_ses)) { - /** Log error to debug */ + rses_property_done(prop); goto return_ret; } - /** - * If session command is being executed in slave - * route to master - */ - if (sescmd_cursor_is_active(rses_get_sescmd_cursor( - router_cli_ses, - BE_SLAVE))) + /** Add sescmd property to router client session */ + rses_property_add(router_cli_ses, prop); + + /** Execute session command in master */ + if (execute_sescmd_in_backend(router_cli_ses, BE_MASTER)) { - LOGIF(LT, tracelog_routed_query(router_cli_ses, - "routeQuery", - master_dcb, - gwbuf_clone(stmtbuf))); - - ret = master_dcb->func.write(master_dcb, stmtbuf); - atomic_add(&inst->stats.n_master, 1); + ret = 1; } else { - LOGIF(LT, tracelog_routed_query(router_cli_ses, - "routeQuery", - slave_dcb, - gwbuf_clone(stmtbuf))); - - ret = slave_dcb->func.write(slave_dcb, stmtbuf); - atomic_add(&inst->stats.n_slave, 1); + /** Log error */ } + /** Execute session command in slave */ + if (execute_sescmd_in_backend(router_cli_ses, BE_SLAVE)) + { + ret = 1; + } + else + { + /** Log error */ + } + + /** Unlock router session */ rses_end_locked_router_action(router_cli_ses); - goto return_ret; - break; - - case QUERY_TYPE_SESSION_WRITE: - /** - * Execute in backends used by current router session. - * Save session variable commands to router session property - * struct. Thus, they - * can be replayed in backends which are started and joined later. - * - * Suppress OK packets sent to MaxScale by slaves. - * - * DOES THIS ALL APPLY TO COM_QUIT AS WELL?? - * - */ - - /** - * Update connections which are used in this session. - * - * For each connection updated, add a flag which indicates that - * OK Packet must arrive for this command before server - * in question is allowed to be used by router. That is, - * maintain a queue of pending OK packets and remove item - * from queue by FIFO. - * - * Return when the master responds OK Packet. Send that - * OK packet back to client. - * - * Suppress OK packets sent to MaxScale by slaves. - * - * Open questions: - * How to handle interleaving session write - * and queries? It would be simple if OK must be received - * from all/both servers before continuing query execution. - * How to maintain the order of operations? Execution queue - * would solve the problem. In the queue some things must be - * executed in serialized manner while some could be executed - * in parallel. Queries mostly. - * - * Instead of waiting for the OK packet from the master, the - * first OK packet could also be sent to client. TBD. - * vraa 9.12.13 - * - */ - LOGIF(LT, (skygw_log_write( - LOGFILE_TRACE, - "%lu [routeQuery:rwsplit] DCB M:%p s:%p, " - "Query type\t%s, " - "packet type %s, routing to all servers.", - pthread_self(), - master_dcb, - slave_dcb, - STRQTYPE(qtype), - STRPACKETTYPE(packet_type)))); - - switch(packet_type) { - /** - case COM_QUIT: - ret = master_dcb->func.write(master_dcb, gwbuf_clone(querybuf)); - slave_dcb->func.write(slave_dcb, querybuf); - break; - */ - case COM_CHANGE_USER: - - LOGIF(LT, tracelog_routed_query( - router_cli_ses, - "routeQuery", - master_dcb, - gwbuf_clone(stmtbuf))); - - master_dcb->func.auth( - master_dcb, - NULL, - master_dcb->session, - gwbuf_clone(stmtbuf)); - - LOGIF(LT, tracelog_routed_query( - router_cli_ses, - "routeQuery", - slave_dcb, - gwbuf_clone(stmtbuf))); - - slave_dcb->func.auth( - slave_dcb, - NULL, - master_dcb->session, - stmtbuf); - break; - - case COM_QUIT: - case COM_QUERY: - /** - * 1. Create new property of type RSES_PROP_TYPE_SESCMD. - * 2. Add property to the ROUTER_CLIENT_SES struct of - * this router session. - * 3. For each backend, and for each non-executed - * sescmd: - * call execution of current sescmd in - * all backends as long as both have executed - * them all. - * Execution call is dcb->func.session. - * All sescmds are executed when its return value is - * NULL, otherwise it is a pointer to next property. - */ - prop = rses_property_init(RSES_PROP_TYPE_SESCMD); - /** - * Additional reference is created to querybuf to - * prevent it from being released before properties - * are cleaned up as a part of router sessionclean-up. - */ - mysql_sescmd_init(prop, stmtbuf, router_cli_ses); - - /** Lock router session */ - if (!rses_begin_locked_router_action(router_cli_ses)) - { - rses_property_done(prop); - goto return_ret; - } - /** Add sescmd property to router client session */ - rses_property_add(router_cli_ses, prop); - - /** Execute session command in master */ - if (execute_sescmd_in_backend(router_cli_ses, BE_MASTER)) - { - ret = 1; - } - else - { - /** Log error */ - } - /** Execute session command in slave */ - if (execute_sescmd_in_backend(router_cli_ses, BE_SLAVE)) - { - ret = 1; - } - else - { - /** Log error */ - } - - /** Unlock router session */ - rses_end_locked_router_action(router_cli_ses); - break; - - default: - LOGIF(LT, tracelog_routed_query(router_cli_ses, - "routeQuery", - master_dcb, - gwbuf_clone(stmtbuf))); - ret = master_dcb->func.write(master_dcb, - (void *)gwbuf_clone(stmtbuf)); - - LOGIF(LT, tracelog_routed_query(router_cli_ses, - "routeQuery", - slave_dcb, - gwbuf_clone(stmtbuf))); - - slave_dcb->func.write(slave_dcb, (void *)stmtbuf); - break; - } /**< switch by packet type */ - - atomic_add(&inst->stats.n_all, 1); - goto return_ret; break; default: - LOGIF(LT, (skygw_log_write( - LOGFILE_TRACE, - "%lu [routeQuery:rwsplit] Query type\t%s, " - "routing to Master by default.", - pthread_self(), - STRQTYPE(qtype)))); - - /** - * Is this really ok? - * What is not known is routed to master. - */ LOGIF(LT, tracelog_routed_query(router_cli_ses, "routeQuery", master_dcb, - gwbuf_clone(stmtbuf))); + gwbuf_clone(querybuf))); + ret = master_dcb->func.write(master_dcb, + (void *)gwbuf_clone(querybuf)); - ret = master_dcb->func.write(master_dcb, stmtbuf); - atomic_add(&inst->stats.n_master, 1); - goto return_ret; + LOGIF(LT, tracelog_routed_query(router_cli_ses, + "routeQuery", + slave_dcb, + gwbuf_clone(querybuf))); + + slave_dcb->func.write(slave_dcb, (void *)querybuf); break; - } /*< switch by query type */ + } /**< switch by packet type */ - /** get next stmt */ - if (!rses_begin_locked_router_action(router_cli_ses)) - { - goto return_ret; - } - master_dcb = router_cli_ses->rses_dcb[BE_MASTER]; + atomic_add(&inst->stats.n_all, 1); + goto return_ret; + break; + + default: + LOGIF(LT, (skygw_log_write( + LOGFILE_TRACE, + "%lu [routeQuery:rwsplit] Query type\t%s, " + "routing to Master by default.", + pthread_self(), + STRQTYPE(qtype)))); - /** stmtbuf is clone of querybuf, and only covers one stmt */ - stmtbuf = (GWBUF *)master_dcb->session->client->func.getstmt((void *)querybuf); - rses_end_locked_router_action(router_cli_ses); - } /* while (stmtbuf != NULL) */ + /** + * Is this really ok? + * What is not known is routed to master. + */ + LOGIF(LT, tracelog_routed_query(router_cli_ses, + "routeQuery", + master_dcb, + gwbuf_clone(querybuf))); + + ret = master_dcb->func.write(master_dcb, querybuf); + atomic_add(&inst->stats.n_master, 1); + goto return_ret; + break; + } /*< switch by query type */ + return_ret: - free(querystr); + if (plainsqlbuf != NULL) + { + gwbuf_free(plainsqlbuf); + } return ret; } @@ -1346,7 +1330,6 @@ static rses_property_t* rses_property_init( { goto return_prop; } -// spinlock_init(&prop->rses_prop_lock); prop->rses_prop_type = prop_type; #if defined(SS_DEBUG) prop->rses_prop_chk_top = CHK_NUM_ROUTER_PROPERTY; @@ -1419,7 +1402,7 @@ static void rses_property_add( } } -/** Router sessiosn must be locked */ +/** Router session must be locked */ static mysql_sescmd_t* rses_property_get_sescmd( rses_property_t* prop) { @@ -1437,9 +1420,7 @@ static mysql_sescmd_t* rses_property_get_sescmd( } return sescmd; } - -// static rses_property_t* rses_property_get_ptr_next( - + /** static void rses_begin_locked_property_action( rses_property_t* prop) @@ -1505,18 +1486,12 @@ static bool sescmd_reply_to_client( { bool succp = false; - // rses_property_t* prop; - CHK_DCB(client_dcb); CHK_MYSQL_SESCMD(scmd); CHK_GWBUF(writebuf); ss_dassert(SPINLOCK_IS_LOCKED( &scmd->my_sescmd_prop->rses_prop_rsession->rses_lock)); -// prop = mysql_sescmd_get_property(scmd); - -// rses_begin_locked_property_action(prop); - if (!scmd->my_sescmd_is_replied) { client_dcb->func.write(client_dcb, writebuf); @@ -1531,7 +1506,6 @@ static bool sescmd_reply_to_client( else { } -// rses_end_locked_property_action(prop); return succp; } @@ -1651,7 +1625,6 @@ static bool execute_sescmd_in_backend( dcb, sescmd_cursor_clone_querybuf(scur))); rc = dcb->func.write(dcb, sescmd_cursor_clone_querybuf(scur)); -// rc = dcb->func.session(dcb, sescmd_cursor_clone_querybuf(scur)); if (rc != 1) { @@ -1805,18 +1778,14 @@ static void tracelog_routed_query( DCB* dcb, GWBUF* buf) { - unsigned char* packet = GWBUF_DATA(buf); + uint8_t* packet = GWBUF_DATA(buf); unsigned char packet_type = packet[4]; size_t len; size_t buflen = GWBUF_LENGTH(buf); char* querystr; char* startpos = (char *)&packet[5]; backend_type_t be_type; - - len = packet[0]; - len += 255*packet[1]; - len += 255*255*packet[2]; - + if (rses->rses_dcb[BE_MASTER] == dcb) { be_type = BE_MASTER; @@ -1829,31 +1798,61 @@ static void tracelog_routed_query( { be_type = BE_UNDEFINED; } - - if (packet_type == '\x03') + if (GWBUF_TYPE(buf) == GWBUF_TYPE_MYSQL) { - querystr = (char *)malloc(len); - memcpy(querystr, startpos, len-1); - querystr[len-1] = '\0'; - LOGIF(LT, (skygw_log_write_flush( - LOGFILE_TRACE, - "%lu [%s] %d bytes long buf, \"%s\" -> %s:%d %s dcb %p", - pthread_self(), - funcname, - buflen, - querystr, - (be_type == BE_MASTER ? - rses->rses_backend[BE_MASTER]->backend_server->name : - (be_type == BE_SLAVE ? - rses->rses_backend[BE_SLAVE]->backend_server->name : - "Target DCB is neither of the backends. This is error")), - (be_type == BE_MASTER ? - rses->rses_backend[BE_MASTER]->backend_server->port : - (be_type == BE_SLAVE ? - rses->rses_backend[BE_SLAVE]->backend_server->port : - -1)), - STRBETYPE(be_type), - dcb))); + len = packet[0]; + len += 256*packet[1]; + len += 256*256*packet[2]; + + if (packet_type == '\x03') + { + querystr = (char *)malloc(len); + memcpy(querystr, startpos, len-1); + querystr[len-1] = '\0'; + LOGIF(LT, (skygw_log_write_flush( + LOGFILE_TRACE, + "%lu [%s] %d bytes long buf, \"%s\" -> %s:%d %s dcb %p", + pthread_self(), + funcname, + buflen, + querystr, + (be_type == BE_MASTER ? + rses->rses_backend[BE_MASTER]->backend_server->name : + (be_type == BE_SLAVE ? + rses->rses_backend[BE_SLAVE]->backend_server->name : + "Target DCB is neither of the backends. This is error")), + (be_type == BE_MASTER ? + rses->rses_backend[BE_MASTER]->backend_server->port : + (be_type == BE_SLAVE ? + rses->rses_backend[BE_SLAVE]->backend_server->port : + -1)), + STRBETYPE(be_type), + dcb))); + } } gwbuf_free(buf); +} + +/** + * Return rc, rc < 0 if router session is closed. rc == 0 if there are no + * capabilities specified, rc > 0 when there are capabilities. + */ +static uint8_t getCapabilities ( + ROUTER* inst, + void* router_session) +{ + ROUTER_CLIENT_SES* rses = (ROUTER_CLIENT_SES *)router_session; + uint8_t rc; + + if (!rses_begin_locked_router_action(rses)) + { + rc = 0xff; + goto return_rc; + } + rc = rses->rses_capabilities; + + rses_end_locked_router_action(rses); + +return_rc: + return rc; } \ No newline at end of file diff --git a/server/modules/routing/testroute.c b/server/modules/routing/testroute.c index 2e1615f25..c0bd73fee 100644 --- a/server/modules/routing/testroute.c +++ b/server/modules/routing/testroute.c @@ -26,6 +26,7 @@ static void closeSession(ROUTER *instance, void *session); static void freeSession(ROUTER *instance, void *session); static int routeQuery(ROUTER *instance, void *session, GWBUF *queue); static void diagnostic(ROUTER *instance, DCB *dcb); +static uint8_t getCapabilities (ROUTER* inst, void* router_session); static ROUTER_OBJECT MyObject = { @@ -36,7 +37,8 @@ static ROUTER_OBJECT MyObject = { routeQuery, diagnostic, NULL, - NULL + NULL, + getCapabilities }; /** @@ -137,3 +139,10 @@ static void diagnostic(ROUTER *instance, DCB *dcb) { } + +static uint8_t getCapabilities( + ROUTER* inst, + void* router_session) +{ + return 0; +} \ No newline at end of file