From 1911430e9630cceb1fc08dc8f7017a41fc5b6e4c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20M=C3=A4kel=C3=A4?= Date: Fri, 13 Jan 2017 10:30:02 +0200 Subject: [PATCH] Clean up insertstream source documentation Cleaned up code and source documentation. --- .../filter/insertstream/insertstream.c | 104 +++++++++++++----- 1 file changed, 74 insertions(+), 30 deletions(-) diff --git a/server/modules/filter/insertstream/insertstream.c b/server/modules/filter/insertstream/insertstream.c index 07dfb1d0a..08dabf07b 100644 --- a/server/modules/filter/insertstream/insertstream.c +++ b/server/modules/filter/insertstream/insertstream.c @@ -47,17 +47,17 @@ static GWBUF* convert_to_stream(GWBUF* buffer, uint8_t packet_num); */ typedef struct { - char *source; /*< Source address to restrict matches */ - char *user; /*< User name to restrict matches */ + char *source; /**< Source address to restrict matches */ + char *user; /**< User name to restrict matches */ } DS_INSTANCE; enum ds_state { - DS_STREAM_CLOSED, - DS_REQUEST_SENT, - DS_REQUEST_ACCEPTED, - DS_STREAM_OPEN, - DS_CLOSING_STREAM + DS_STREAM_CLOSED, /**< Initial state */ + DS_REQUEST_SENT, /**< Request for stream sent */ + DS_REQUEST_ACCEPTED, /**< Stream request accepted */ + DS_STREAM_OPEN, /**< Stream is open */ + DS_CLOSING_STREAM /**< Stream is about to be closed */ }; /** @@ -65,15 +65,14 @@ enum ds_state */ typedef struct { - DOWNSTREAM down; /* The downstream filter */ - UPSTREAM up; - GWBUF *queue; - GWBUF *writebuf; - bool active; + DOWNSTREAM down; /**< Downstream filter */ + UPSTREAM up; /**< Upstream filter*/ + GWBUF *queue; /**< Queue containing a stored query */ + bool active; /**< Whether the session is active */ + uint8_t packet_num; /**< If stream is open, the current packet sequence number */ + DCB* client_dcb; /**< Client DCB */ + enum ds_state state; /**< The current state of the stream */ char target[MYSQL_TABLE_MAXLEN + MYSQL_DATABASE_MAXLEN + 1]; /**< Current target table */ - uint8_t packet_num; - DCB* client_dcb; - enum ds_state state; /*< Whether a LOAD DATA LOCAL INFILE was sent or not */ } DS_SESSION; /** @@ -263,9 +262,10 @@ static void setUpstream(FILTER *instance, void *session, UPSTREAM *upstream) * @param instance The filter instance data * @param session The filter session * @param queue The query data + * + * @return 1 on success, 0 on error */ -static int -routeQuery(FILTER *instance, void *session, GWBUF *queue) +static int routeQuery(FILTER *instance, void *session, GWBUF *queue) { DS_SESSION *my_session = (DS_SESSION *) session; char target[MYSQL_TABLE_MAXLEN + MYSQL_DATABASE_MAXLEN + 1]; @@ -358,6 +358,9 @@ routeQuery(FILTER *instance, void *session, GWBUF *queue) return rc; } +/** + * Extract inserted values + */ static char* get_value(char* data, uint32_t datalen, char** dest, uint32_t* destlen) { char *value_start = strnchr_esc_mysql(data, '(', datalen); @@ -378,10 +381,18 @@ static char* get_value(char* data, uint32_t datalen, char** dest, uint32_t* dest return NULL; } +/** + * @brief Convert an INSERT query into a CSV stream + * + * @param buffer Buffer containing the query + * @param packet_num The current packet sequence number + * + * @return The modified buffer + */ static GWBUF* convert_to_stream(GWBUF* buffer, uint8_t packet_num) { /** Remove the INSERT INTO ... from the buffer */ - char *dataptr = (char*) GWBUF_DATA(buffer); + char *dataptr = (char*)GWBUF_DATA(buffer); char *modptr = strnchr_esc_mysql(dataptr + 5, '(', GWBUF_LENGTH(buffer)); /** Leave some space for the header so we don't have to allocate a new one */ @@ -392,10 +403,12 @@ static GWBUF* convert_to_stream(GWBUF* buffer, uint8_t packet_num) char* value; uint32_t valuesize; - /** Remove the parentheses from the insert and recalculate the packet length */ + /** + * Remove the parentheses from the insert, add newlines between values and + * recalculate the packet length + */ while ((dataptr = get_value(dataptr, end - dataptr, &value, &valuesize))) { - // TODO: Don't move everything, only move the needed parts memmove(store_end, value, valuesize); store_end += valuesize; *store_end++ = '\n'; @@ -412,9 +425,20 @@ static GWBUF* convert_to_stream(GWBUF* buffer, uint8_t packet_num) return buffer; } +/** + * @brief Handle replies from the backend + * + * + * @param instance Filter instance + * @param session Filter session + * @param reply The reply from the backend + * + * @return 1 on success, 0 on error + */ static int clientReply(FILTER* instance, void *session, GWBUF *reply) { DS_SESSION *my_session = (DS_SESSION*) session; + int rc = 1; if (my_session->state == DS_CLOSING_STREAM || (my_session->state == DS_REQUEST_SENT && @@ -440,11 +464,11 @@ static int clientReply(FILTER* instance, void *session, GWBUF *reply) } else { - return my_session->up.clientReply(my_session->up.instance, - my_session->up.session, reply); + rc = my_session->up.clientReply(my_session->up.instance, + my_session->up.session, reply); } - return 0; + return rc; } /** @@ -454,12 +478,11 @@ static int clientReply(FILTER* instance, void *session, GWBUF *reply) * instance as a whole, otherwise print diagnostics for the * particular session. * - * @param instance The filter instance - * @param fsession Filter session, may be NULL - * @param dcb The DCB for diagnostic output + * @param instance The filter instance + * @param fsession Filter session, may be NULL + * @param dcb The DCB for diagnostic output */ -static void -diagnostic(FILTER *instance, void *fsession, DCB *dcb) +static void diagnostic(FILTER *instance, void *fsession, DCB *dcb) { DS_INSTANCE *my_instance = (DS_INSTANCE *) instance; @@ -473,11 +496,23 @@ diagnostic(FILTER *instance, void *fsession, DCB *dcb) } } +/** + * @brief Get filter capabilities + * + * @return Filter capabilities + */ static uint64_t getCapabilities(void) { return RCAP_TYPE_TRANSACTION_TRACKING; } +/** + * @brief Check if an insert statement has implicitly ordered values + * + * @param buffer Buffer to check + * + * @return True if the insert does not define the order of the values + */ static bool only_implicit_values(GWBUF *buffer) { bool rval = false; @@ -512,10 +547,12 @@ static bool only_implicit_values(GWBUF *buffer) } /** - * Check if a buffer contains an insert statement + * @brief Extract insert target * * @param buffer Buffer to analyze - * @return True if the buffer contains an insert statement + * + * @return True if the buffer contains an insert statement and the target table + * was successfully extracted */ static bool extract_insert_target(GWBUF *buffer, char* target, int len) { @@ -547,6 +584,13 @@ static bool extract_insert_target(GWBUF *buffer, char* target, int len) return rval; } +/** + * @brief Create a LOAD DATA LOCAL INFILE statement + * + * @param target The table name where the data is loaded + * + * @return Buffer containing the statement or NULL if memory allocation failed + */ static GWBUF* create_load_data_command(const char *target) { char str[sizeof(load_data_template) + strlen(target) + 1];