From 41d2d65e71f899b528a399405612aa9b43eac92d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20M=C3=A4kel=C3=A4?= Date: Fri, 13 Jan 2017 14:41:58 +0200 Subject: [PATCH] Use new type names in insertstream Use the renamed types and change numeric constants to defines. --- .../filter/insertstream/insertstream.c | 140 ++++++++++-------- 1 file changed, 77 insertions(+), 63 deletions(-) diff --git a/server/modules/filter/insertstream/insertstream.c b/server/modules/filter/insertstream/insertstream.c index 08dabf07b..8d09ba2f6 100644 --- a/server/modules/filter/insertstream/insertstream.c +++ b/server/modules/filter/insertstream/insertstream.c @@ -28,16 +28,16 @@ * @file datastream.c - Streaming of bulk inserts */ -static FILTER *createInstance(const char *name, char **options, CONFIG_PARAMETER*params); -static void *newSession(FILTER *instance, SESSION *session); -static void closeSession(FILTER *instance, void *session); -static void freeSession(FILTER *instance, void *session); -static void setDownstream(FILTER *instance, void *fsession, DOWNSTREAM *downstream); -static void setUpstream(FILTER *instance, void *session, UPSTREAM *upstream); -static int routeQuery(FILTER *instance, void *fsession, GWBUF *queue); -static void diagnostic(FILTER *instance, void *fsession, DCB *dcb); +static MXS_FILTER *createInstance(const char *name, char **options, CONFIG_PARAMETER*params); +static MXS_FILTER_SESSION *newSession(MXS_FILTER *instance, SESSION *session); +static void closeSession(MXS_FILTER *instance, MXS_FILTER_SESSION *session); +static void freeSession(MXS_FILTER *instance, MXS_FILTER_SESSION *session); +static void setDownstream(MXS_FILTER *instance, MXS_FILTER_SESSION *fsession, MXS_DOWNSTREAM *downstream); +static void setUpstream(MXS_FILTER *instance, MXS_FILTER_SESSION *session, MXS_UPSTREAM *upstream); +static int32_t routeQuery(MXS_FILTER *instance, MXS_FILTER_SESSION *fsession, GWBUF *queue); +static void diagnostic(MXS_FILTER *instance, MXS_FILTER_SESSION *fsession, DCB *dcb); static uint64_t getCapabilities(void); -static int clientReply(FILTER* instance, void *session, GWBUF *reply); +static int32_t clientReply(MXS_FILTER* instance, MXS_FILTER_SESSION *session, GWBUF *reply); static bool extract_insert_target(GWBUF *buffer, char* target, int len); static GWBUF* create_load_data_command(const char *target); static GWBUF* convert_to_stream(GWBUF* buffer, uint8_t packet_num); @@ -65,8 +65,8 @@ enum ds_state */ typedef struct { - DOWNSTREAM down; /**< Downstream filter */ - UPSTREAM up; /**< Upstream filter*/ + MXS_DOWNSTREAM down; /**< Downstream filter */ + MXS_UPSTREAM up; /**< Upstream filter*/ GWBUF *queue; /**< Queue containing a stored query */ bool active; /**< Whether the session is active */ uint8_t packet_num; /**< If stream is open, the current packet sequence number */ @@ -86,7 +86,7 @@ typedef struct MXS_MODULE* MXS_CREATE_MODULE() { - static FILTER_OBJECT MyObject = + static MXS_FILTER_OBJECT MyObject = { createInstance, newSession, @@ -105,7 +105,7 @@ MXS_MODULE* MXS_CREATE_MODULE() { MXS_MODULE_API_FILTER, MXS_MODULE_EXPERIMENTAL, - FILTER_VERSION, + MXS_FILTER_VERSION, "Data streaming filter", "1.0.0", &MyObject, @@ -152,7 +152,7 @@ static const char load_data_template[] = "LOAD DATA LOCAL INFILE 'maxscale.data' * * @return The instance data for this new instance */ -static FILTER * +static MXS_FILTER * createInstance(const char *name, char **options, CONFIG_PARAMETER *params) { DS_INSTANCE *my_instance; @@ -163,7 +163,7 @@ createInstance(const char *name, char **options, CONFIG_PARAMETER *params) my_instance->user = config_copy_string(params, "user"); } - return (FILTER *) my_instance; + return (MXS_FILTER *) my_instance; } /** @@ -173,8 +173,8 @@ createInstance(const char *name, char **options, CONFIG_PARAMETER *params) * @param session The session itself * @return Session specific data for this session */ -static void * -newSession(FILTER *instance, SESSION *session) +static MXS_FILTER_SESSION * +newSession(MXS_FILTER *instance, SESSION *session) { DS_INSTANCE *my_instance = (DS_INSTANCE *) instance; DS_SESSION *my_session; @@ -199,7 +199,7 @@ newSession(FILTER *instance, SESSION *session) } } - return my_session; + return (MXS_FILTER_SESSION*)my_session; } /** @@ -210,7 +210,7 @@ newSession(FILTER *instance, SESSION *session) * @param session The session being closed */ static void -closeSession(FILTER *instance, void *session) +closeSession(MXS_FILTER *instance, MXS_FILTER_SESSION *session) { } @@ -221,10 +221,9 @@ closeSession(FILTER *instance, void *session) * @param session The session being closed */ static void -freeSession(FILTER *instance, void *session) +freeSession(MXS_FILTER *instance, MXS_FILTER_SESSION *session) { MXS_FREE(session); - return; } /** @@ -235,7 +234,7 @@ freeSession(FILTER *instance, void *session) * @param downstream The downstream filter or router */ static void -setDownstream(FILTER *instance, void *session, DOWNSTREAM *downstream) +setDownstream(MXS_FILTER *instance, MXS_FILTER_SESSION *session, MXS_DOWNSTREAM *downstream) { DS_SESSION *my_session = (DS_SESSION*) session; my_session->down = *downstream; @@ -247,7 +246,7 @@ setDownstream(FILTER *instance, void *session, DOWNSTREAM *downstream) * @param session Filter session * @param upstream Upstream filter */ -static void setUpstream(FILTER *instance, void *session, UPSTREAM *upstream) +static void setUpstream(MXS_FILTER *instance, MXS_FILTER_SESSION *session, MXS_UPSTREAM *upstream) { DS_SESSION *my_session = (DS_SESSION*) session; my_session->up = *upstream; @@ -265,7 +264,7 @@ static void setUpstream(FILTER *instance, void *session, UPSTREAM *upstream) * * @return 1 on success, 0 on error */ -static int routeQuery(FILTER *instance, void *session, GWBUF *queue) +static int32_t routeQuery(MXS_FILTER *instance, MXS_FILTER_SESSION *session, GWBUF *queue) { DS_SESSION *my_session = (DS_SESSION *) session; char target[MYSQL_TABLE_MAXLEN + MYSQL_DATABASE_MAXLEN + 1]; @@ -277,39 +276,48 @@ static int routeQuery(FILTER *instance, void *session, GWBUF *queue) if (session_trx_is_active(my_session->client_dcb->session) && extract_insert_target(queue, target, sizeof(target))) { - if (my_session->state == DS_STREAM_CLOSED) + switch (my_session->state) { - /** We're opening a new stream */ - strcpy(my_session->target, target); - my_session->queue = queue; - my_session->state = DS_REQUEST_SENT; - my_session->packet_num = 0; - queue = create_load_data_command(target); - } - else - { - if (my_session->state == DS_REQUEST_ACCEPTED) - { - my_session->state = DS_STREAM_OPEN; - } + case DS_STREAM_CLOSED: + /** We're opening a new stream */ + strcpy(my_session->target, target); + my_session->queue = queue; + my_session->state = DS_REQUEST_SENT; + my_session->packet_num = 0; + queue = create_load_data_command(target); + break; - if (my_session->state == DS_STREAM_OPEN) - { + case DS_REQUEST_ACCEPTED: + my_session->state = DS_STREAM_OPEN; + /** Fallthrough */ + + case DS_STREAM_OPEN: if (strcmp(target, my_session->target) == 0) { - /** Stream is open and targets match, convert the insert into - * a data stream */ + /** + * Stream is open and targets match, convert the insert into + * a data stream + */ uint8_t packet_num = ++my_session->packet_num; send_ok = true; queue = convert_to_stream(queue, packet_num); } else { - /** Target mismatch */ + /** + * Target mismatch + * + * TODO: Instead of sending an error, we could just open a new stream + */ gwbuf_free(queue); send_error = true; } - } + break; + + default: + MXS_ERROR("Unexpected state: %d", my_session->state); + ss_dassert(false); + break; } } else @@ -319,18 +327,24 @@ static int routeQuery(FILTER *instance, void *session, GWBUF *queue) uint8_t packet_num; *my_session->target = '\0'; - if (my_session->state == DS_STREAM_OPEN) + switch (my_session->state) { - /** Stream is open, we need to close it */ - my_session->state = DS_CLOSING_STREAM; - send_empty = true; - packet_num = ++my_session->packet_num; - my_session->queue = queue; - } - else if (my_session->state == DS_REQUEST_ACCEPTED) - { - my_session->state = DS_STREAM_OPEN; - send_ok = true; + case DS_STREAM_OPEN: + /** Stream is open, we need to close it */ + my_session->state = DS_CLOSING_STREAM; + send_empty = true; + packet_num = ++my_session->packet_num; + my_session->queue = queue; + break; + + case DS_REQUEST_ACCEPTED: + my_session->state = DS_STREAM_OPEN; + send_ok = true; + break; + + default: + ss_dassert(my_session->state == DS_STREAM_CLOSED); + break; } if (send_empty) @@ -393,12 +407,12 @@ static GWBUF* convert_to_stream(GWBUF* buffer, uint8_t packet_num) { /** Remove the INSERT INTO ... from the buffer */ char *dataptr = (char*)GWBUF_DATA(buffer); - char *modptr = strnchr_esc_mysql(dataptr + 5, '(', GWBUF_LENGTH(buffer)); + char *modptr = strnchr_esc_mysql(dataptr + MYSQL_HEADER_LEN + 1, '(', GWBUF_LENGTH(buffer)); /** Leave some space for the header so we don't have to allocate a new one */ - buffer = gwbuf_consume(buffer, (modptr - dataptr) - 4); + buffer = gwbuf_consume(buffer, (modptr - dataptr) - MYSQL_HEADER_LEN); char* header_start = (char*)GWBUF_DATA(buffer); - char* store_end = dataptr = header_start + 4; + char* store_end = dataptr = header_start + MYSQL_HEADER_LEN; char* end = buffer->end; char* value; uint32_t valuesize; @@ -415,7 +429,7 @@ static GWBUF* convert_to_stream(GWBUF* buffer, uint8_t packet_num) } gwbuf_rtrim(buffer, (char*)buffer->end - store_end); - uint32_t len = gwbuf_length(buffer) - 4; + uint32_t len = gwbuf_length(buffer) - MYSQL_HEADER_LEN; *header_start++ = len; *header_start++ = len >> 8; @@ -435,7 +449,7 @@ static GWBUF* convert_to_stream(GWBUF* buffer, uint8_t packet_num) * * @return 1 on success, 0 on error */ -static int clientReply(FILTER* instance, void *session, GWBUF *reply) +static int32_t clientReply(MXS_FILTER* instance, MXS_FILTER_SESSION *session, GWBUF *reply) { DS_SESSION *my_session = (DS_SESSION*) session; int rc = 1; @@ -482,7 +496,7 @@ static int clientReply(FILTER* instance, void *session, GWBUF *reply) * @param fsession Filter session, may be NULL * @param dcb The DCB for diagnostic output */ -static void diagnostic(FILTER *instance, void *fsession, DCB *dcb) +static void diagnostic(MXS_FILTER *instance, MXS_FILTER_SESSION *fsession, DCB *dcb) { DS_INSTANCE *my_instance = (DS_INSTANCE *) instance; @@ -517,7 +531,7 @@ static bool only_implicit_values(GWBUF *buffer) { bool rval = false; char *data = (char*)GWBUF_DATA(buffer); - char *ptr = strnchr_esc_mysql(data + 5, '(', GWBUF_LENGTH(buffer)); + char *ptr = strnchr_esc_mysql(data + MYSQL_HEADER_LEN + 1, '(', GWBUF_LENGTH(buffer)); if (ptr && (ptr = strnchr_esc_mysql(ptr, ')', GWBUF_LENGTH(buffer) - (ptr - data)))) { @@ -597,7 +611,7 @@ static GWBUF* create_load_data_command(const char *target) snprintf(str, sizeof(str), load_data_template, target); uint32_t payload = strlen(str) + 1; - GWBUF *rval = gwbuf_alloc(payload + 4); + GWBUF *rval = gwbuf_alloc(payload + MYSQL_HEADER_LEN); if (rval) { uint8_t *ptr = GWBUF_DATA(rval);