Use new type names in insertstream

Use the renamed types and change numeric constants to defines.
This commit is contained in:
Markus Mäkelä
2017-01-13 14:41:58 +02:00
parent a3a9edd3b6
commit 41d2d65e71

View File

@ -28,16 +28,16 @@
* @file datastream.c - Streaming of bulk inserts * @file datastream.c - Streaming of bulk inserts
*/ */
static FILTER *createInstance(const char *name, char **options, CONFIG_PARAMETER*params); static MXS_FILTER *createInstance(const char *name, char **options, CONFIG_PARAMETER*params);
static void *newSession(FILTER *instance, SESSION *session); static MXS_FILTER_SESSION *newSession(MXS_FILTER *instance, SESSION *session);
static void closeSession(FILTER *instance, void *session); static void closeSession(MXS_FILTER *instance, MXS_FILTER_SESSION *session);
static void freeSession(FILTER *instance, void *session); static void freeSession(MXS_FILTER *instance, MXS_FILTER_SESSION *session);
static void setDownstream(FILTER *instance, void *fsession, DOWNSTREAM *downstream); static void setDownstream(MXS_FILTER *instance, MXS_FILTER_SESSION *fsession, MXS_DOWNSTREAM *downstream);
static void setUpstream(FILTER *instance, void *session, UPSTREAM *upstream); static void setUpstream(MXS_FILTER *instance, MXS_FILTER_SESSION *session, MXS_UPSTREAM *upstream);
static int routeQuery(FILTER *instance, void *fsession, GWBUF *queue); static int32_t routeQuery(MXS_FILTER *instance, MXS_FILTER_SESSION *fsession, GWBUF *queue);
static void diagnostic(FILTER *instance, void *fsession, DCB *dcb); static void diagnostic(MXS_FILTER *instance, MXS_FILTER_SESSION *fsession, DCB *dcb);
static uint64_t getCapabilities(void); static uint64_t getCapabilities(void);
static int clientReply(FILTER* instance, void *session, GWBUF *reply); static int32_t clientReply(MXS_FILTER* instance, MXS_FILTER_SESSION *session, GWBUF *reply);
static bool extract_insert_target(GWBUF *buffer, char* target, int len); static bool extract_insert_target(GWBUF *buffer, char* target, int len);
static GWBUF* create_load_data_command(const char *target); static GWBUF* create_load_data_command(const char *target);
static GWBUF* convert_to_stream(GWBUF* buffer, uint8_t packet_num); static GWBUF* convert_to_stream(GWBUF* buffer, uint8_t packet_num);
@ -65,8 +65,8 @@ enum ds_state
*/ */
typedef struct typedef struct
{ {
DOWNSTREAM down; /**< Downstream filter */ MXS_DOWNSTREAM down; /**< Downstream filter */
UPSTREAM up; /**< Upstream filter*/ MXS_UPSTREAM up; /**< Upstream filter*/
GWBUF *queue; /**< Queue containing a stored query */ GWBUF *queue; /**< Queue containing a stored query */
bool active; /**< Whether the session is active */ bool active; /**< Whether the session is active */
uint8_t packet_num; /**< If stream is open, the current packet sequence number */ uint8_t packet_num; /**< If stream is open, the current packet sequence number */
@ -86,7 +86,7 @@ typedef struct
MXS_MODULE* MXS_CREATE_MODULE() MXS_MODULE* MXS_CREATE_MODULE()
{ {
static FILTER_OBJECT MyObject = static MXS_FILTER_OBJECT MyObject =
{ {
createInstance, createInstance,
newSession, newSession,
@ -105,7 +105,7 @@ MXS_MODULE* MXS_CREATE_MODULE()
{ {
MXS_MODULE_API_FILTER, MXS_MODULE_API_FILTER,
MXS_MODULE_EXPERIMENTAL, MXS_MODULE_EXPERIMENTAL,
FILTER_VERSION, MXS_FILTER_VERSION,
"Data streaming filter", "Data streaming filter",
"1.0.0", "1.0.0",
&MyObject, &MyObject,
@ -152,7 +152,7 @@ static const char load_data_template[] = "LOAD DATA LOCAL INFILE 'maxscale.data'
* *
* @return The instance data for this new instance * @return The instance data for this new instance
*/ */
static FILTER * static MXS_FILTER *
createInstance(const char *name, char **options, CONFIG_PARAMETER *params) createInstance(const char *name, char **options, CONFIG_PARAMETER *params)
{ {
DS_INSTANCE *my_instance; DS_INSTANCE *my_instance;
@ -163,7 +163,7 @@ createInstance(const char *name, char **options, CONFIG_PARAMETER *params)
my_instance->user = config_copy_string(params, "user"); my_instance->user = config_copy_string(params, "user");
} }
return (FILTER *) my_instance; return (MXS_FILTER *) my_instance;
} }
/** /**
@ -173,8 +173,8 @@ createInstance(const char *name, char **options, CONFIG_PARAMETER *params)
* @param session The session itself * @param session The session itself
* @return Session specific data for this session * @return Session specific data for this session
*/ */
static void * static MXS_FILTER_SESSION *
newSession(FILTER *instance, SESSION *session) newSession(MXS_FILTER *instance, SESSION *session)
{ {
DS_INSTANCE *my_instance = (DS_INSTANCE *) instance; DS_INSTANCE *my_instance = (DS_INSTANCE *) instance;
DS_SESSION *my_session; DS_SESSION *my_session;
@ -199,7 +199,7 @@ newSession(FILTER *instance, SESSION *session)
} }
} }
return my_session; return (MXS_FILTER_SESSION*)my_session;
} }
/** /**
@ -210,7 +210,7 @@ newSession(FILTER *instance, SESSION *session)
* @param session The session being closed * @param session The session being closed
*/ */
static void static void
closeSession(FILTER *instance, void *session) closeSession(MXS_FILTER *instance, MXS_FILTER_SESSION *session)
{ {
} }
@ -221,10 +221,9 @@ closeSession(FILTER *instance, void *session)
* @param session The session being closed * @param session The session being closed
*/ */
static void static void
freeSession(FILTER *instance, void *session) freeSession(MXS_FILTER *instance, MXS_FILTER_SESSION *session)
{ {
MXS_FREE(session); MXS_FREE(session);
return;
} }
/** /**
@ -235,7 +234,7 @@ freeSession(FILTER *instance, void *session)
* @param downstream The downstream filter or router * @param downstream The downstream filter or router
*/ */
static void static void
setDownstream(FILTER *instance, void *session, DOWNSTREAM *downstream) setDownstream(MXS_FILTER *instance, MXS_FILTER_SESSION *session, MXS_DOWNSTREAM *downstream)
{ {
DS_SESSION *my_session = (DS_SESSION*) session; DS_SESSION *my_session = (DS_SESSION*) session;
my_session->down = *downstream; my_session->down = *downstream;
@ -247,7 +246,7 @@ setDownstream(FILTER *instance, void *session, DOWNSTREAM *downstream)
* @param session Filter session * @param session Filter session
* @param upstream Upstream filter * @param upstream Upstream filter
*/ */
static void setUpstream(FILTER *instance, void *session, UPSTREAM *upstream) static void setUpstream(MXS_FILTER *instance, MXS_FILTER_SESSION *session, MXS_UPSTREAM *upstream)
{ {
DS_SESSION *my_session = (DS_SESSION*) session; DS_SESSION *my_session = (DS_SESSION*) session;
my_session->up = *upstream; my_session->up = *upstream;
@ -265,7 +264,7 @@ static void setUpstream(FILTER *instance, void *session, UPSTREAM *upstream)
* *
* @return 1 on success, 0 on error * @return 1 on success, 0 on error
*/ */
static int routeQuery(FILTER *instance, void *session, GWBUF *queue) static int32_t routeQuery(MXS_FILTER *instance, MXS_FILTER_SESSION *session, GWBUF *queue)
{ {
DS_SESSION *my_session = (DS_SESSION *) session; DS_SESSION *my_session = (DS_SESSION *) session;
char target[MYSQL_TABLE_MAXLEN + MYSQL_DATABASE_MAXLEN + 1]; char target[MYSQL_TABLE_MAXLEN + MYSQL_DATABASE_MAXLEN + 1];
@ -277,39 +276,48 @@ static int routeQuery(FILTER *instance, void *session, GWBUF *queue)
if (session_trx_is_active(my_session->client_dcb->session) && if (session_trx_is_active(my_session->client_dcb->session) &&
extract_insert_target(queue, target, sizeof(target))) extract_insert_target(queue, target, sizeof(target)))
{ {
if (my_session->state == DS_STREAM_CLOSED) switch (my_session->state)
{ {
case DS_STREAM_CLOSED:
/** We're opening a new stream */ /** We're opening a new stream */
strcpy(my_session->target, target); strcpy(my_session->target, target);
my_session->queue = queue; my_session->queue = queue;
my_session->state = DS_REQUEST_SENT; my_session->state = DS_REQUEST_SENT;
my_session->packet_num = 0; my_session->packet_num = 0;
queue = create_load_data_command(target); queue = create_load_data_command(target);
} break;
else
{
if (my_session->state == DS_REQUEST_ACCEPTED)
{
my_session->state = DS_STREAM_OPEN;
}
if (my_session->state == DS_STREAM_OPEN) case DS_REQUEST_ACCEPTED:
{ my_session->state = DS_STREAM_OPEN;
/** Fallthrough */
case DS_STREAM_OPEN:
if (strcmp(target, my_session->target) == 0) if (strcmp(target, my_session->target) == 0)
{ {
/** Stream is open and targets match, convert the insert into /**
* a data stream */ * Stream is open and targets match, convert the insert into
* a data stream
*/
uint8_t packet_num = ++my_session->packet_num; uint8_t packet_num = ++my_session->packet_num;
send_ok = true; send_ok = true;
queue = convert_to_stream(queue, packet_num); queue = convert_to_stream(queue, packet_num);
} }
else else
{ {
/** Target mismatch */ /**
* Target mismatch
*
* TODO: Instead of sending an error, we could just open a new stream
*/
gwbuf_free(queue); gwbuf_free(queue);
send_error = true; send_error = true;
} }
} break;
default:
MXS_ERROR("Unexpected state: %d", my_session->state);
ss_dassert(false);
break;
} }
} }
else else
@ -319,18 +327,24 @@ static int routeQuery(FILTER *instance, void *session, GWBUF *queue)
uint8_t packet_num; uint8_t packet_num;
*my_session->target = '\0'; *my_session->target = '\0';
if (my_session->state == DS_STREAM_OPEN) switch (my_session->state)
{ {
case DS_STREAM_OPEN:
/** Stream is open, we need to close it */ /** Stream is open, we need to close it */
my_session->state = DS_CLOSING_STREAM; my_session->state = DS_CLOSING_STREAM;
send_empty = true; send_empty = true;
packet_num = ++my_session->packet_num; packet_num = ++my_session->packet_num;
my_session->queue = queue; my_session->queue = queue;
} break;
else if (my_session->state == DS_REQUEST_ACCEPTED)
{ case DS_REQUEST_ACCEPTED:
my_session->state = DS_STREAM_OPEN; my_session->state = DS_STREAM_OPEN;
send_ok = true; send_ok = true;
break;
default:
ss_dassert(my_session->state == DS_STREAM_CLOSED);
break;
} }
if (send_empty) if (send_empty)
@ -393,12 +407,12 @@ static GWBUF* convert_to_stream(GWBUF* buffer, uint8_t packet_num)
{ {
/** Remove the INSERT INTO ... from the buffer */ /** Remove the INSERT INTO ... from the buffer */
char *dataptr = (char*)GWBUF_DATA(buffer); char *dataptr = (char*)GWBUF_DATA(buffer);
char *modptr = strnchr_esc_mysql(dataptr + 5, '(', GWBUF_LENGTH(buffer)); char *modptr = strnchr_esc_mysql(dataptr + MYSQL_HEADER_LEN + 1, '(', GWBUF_LENGTH(buffer));
/** Leave some space for the header so we don't have to allocate a new one */ /** Leave some space for the header so we don't have to allocate a new one */
buffer = gwbuf_consume(buffer, (modptr - dataptr) - 4); buffer = gwbuf_consume(buffer, (modptr - dataptr) - MYSQL_HEADER_LEN);
char* header_start = (char*)GWBUF_DATA(buffer); char* header_start = (char*)GWBUF_DATA(buffer);
char* store_end = dataptr = header_start + 4; char* store_end = dataptr = header_start + MYSQL_HEADER_LEN;
char* end = buffer->end; char* end = buffer->end;
char* value; char* value;
uint32_t valuesize; uint32_t valuesize;
@ -415,7 +429,7 @@ static GWBUF* convert_to_stream(GWBUF* buffer, uint8_t packet_num)
} }
gwbuf_rtrim(buffer, (char*)buffer->end - store_end); gwbuf_rtrim(buffer, (char*)buffer->end - store_end);
uint32_t len = gwbuf_length(buffer) - 4; uint32_t len = gwbuf_length(buffer) - MYSQL_HEADER_LEN;
*header_start++ = len; *header_start++ = len;
*header_start++ = len >> 8; *header_start++ = len >> 8;
@ -435,7 +449,7 @@ static GWBUF* convert_to_stream(GWBUF* buffer, uint8_t packet_num)
* *
* @return 1 on success, 0 on error * @return 1 on success, 0 on error
*/ */
static int clientReply(FILTER* instance, void *session, GWBUF *reply) static int32_t clientReply(MXS_FILTER* instance, MXS_FILTER_SESSION *session, GWBUF *reply)
{ {
DS_SESSION *my_session = (DS_SESSION*) session; DS_SESSION *my_session = (DS_SESSION*) session;
int rc = 1; int rc = 1;
@ -482,7 +496,7 @@ static int clientReply(FILTER* instance, void *session, GWBUF *reply)
* @param fsession Filter session, may be NULL * @param fsession Filter session, may be NULL
* @param dcb The DCB for diagnostic output * @param dcb The DCB for diagnostic output
*/ */
static void diagnostic(FILTER *instance, void *fsession, DCB *dcb) static void diagnostic(MXS_FILTER *instance, MXS_FILTER_SESSION *fsession, DCB *dcb)
{ {
DS_INSTANCE *my_instance = (DS_INSTANCE *) instance; DS_INSTANCE *my_instance = (DS_INSTANCE *) instance;
@ -517,7 +531,7 @@ static bool only_implicit_values(GWBUF *buffer)
{ {
bool rval = false; bool rval = false;
char *data = (char*)GWBUF_DATA(buffer); char *data = (char*)GWBUF_DATA(buffer);
char *ptr = strnchr_esc_mysql(data + 5, '(', GWBUF_LENGTH(buffer)); char *ptr = strnchr_esc_mysql(data + MYSQL_HEADER_LEN + 1, '(', GWBUF_LENGTH(buffer));
if (ptr && (ptr = strnchr_esc_mysql(ptr, ')', GWBUF_LENGTH(buffer) - (ptr - data)))) if (ptr && (ptr = strnchr_esc_mysql(ptr, ')', GWBUF_LENGTH(buffer) - (ptr - data))))
{ {
@ -597,7 +611,7 @@ static GWBUF* create_load_data_command(const char *target)
snprintf(str, sizeof(str), load_data_template, target); snprintf(str, sizeof(str), load_data_template, target);
uint32_t payload = strlen(str) + 1; uint32_t payload = strlen(str) + 1;
GWBUF *rval = gwbuf_alloc(payload + 4); GWBUF *rval = gwbuf_alloc(payload + MYSQL_HEADER_LEN);
if (rval) if (rval)
{ {
uint8_t *ptr = GWBUF_DATA(rval); uint8_t *ptr = GWBUF_DATA(rval);