Clean up insertstream source documentation
Cleaned up code and source documentation.
This commit is contained in:
@ -47,17 +47,17 @@ static GWBUF* convert_to_stream(GWBUF* buffer, uint8_t packet_num);
|
|||||||
*/
|
*/
|
||||||
typedef struct
|
typedef struct
|
||||||
{
|
{
|
||||||
char *source; /*< Source address to restrict matches */
|
char *source; /**< Source address to restrict matches */
|
||||||
char *user; /*< User name to restrict matches */
|
char *user; /**< User name to restrict matches */
|
||||||
} DS_INSTANCE;
|
} DS_INSTANCE;
|
||||||
|
|
||||||
enum ds_state
|
enum ds_state
|
||||||
{
|
{
|
||||||
DS_STREAM_CLOSED,
|
DS_STREAM_CLOSED, /**< Initial state */
|
||||||
DS_REQUEST_SENT,
|
DS_REQUEST_SENT, /**< Request for stream sent */
|
||||||
DS_REQUEST_ACCEPTED,
|
DS_REQUEST_ACCEPTED, /**< Stream request accepted */
|
||||||
DS_STREAM_OPEN,
|
DS_STREAM_OPEN, /**< Stream is open */
|
||||||
DS_CLOSING_STREAM
|
DS_CLOSING_STREAM /**< Stream is about to be closed */
|
||||||
};
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -65,15 +65,14 @@ enum ds_state
|
|||||||
*/
|
*/
|
||||||
typedef struct
|
typedef struct
|
||||||
{
|
{
|
||||||
DOWNSTREAM down; /* The downstream filter */
|
DOWNSTREAM down; /**< Downstream filter */
|
||||||
UPSTREAM up;
|
UPSTREAM up; /**< Upstream filter*/
|
||||||
GWBUF *queue;
|
GWBUF *queue; /**< Queue containing a stored query */
|
||||||
GWBUF *writebuf;
|
bool active; /**< Whether the session is active */
|
||||||
bool active;
|
uint8_t packet_num; /**< If stream is open, the current packet sequence number */
|
||||||
|
DCB* client_dcb; /**< Client DCB */
|
||||||
|
enum ds_state state; /**< The current state of the stream */
|
||||||
char target[MYSQL_TABLE_MAXLEN + MYSQL_DATABASE_MAXLEN + 1]; /**< Current target table */
|
char target[MYSQL_TABLE_MAXLEN + MYSQL_DATABASE_MAXLEN + 1]; /**< Current target table */
|
||||||
uint8_t packet_num;
|
|
||||||
DCB* client_dcb;
|
|
||||||
enum ds_state state; /*< Whether a LOAD DATA LOCAL INFILE was sent or not */
|
|
||||||
} DS_SESSION;
|
} DS_SESSION;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -263,9 +262,10 @@ static void setUpstream(FILTER *instance, void *session, UPSTREAM *upstream)
|
|||||||
* @param instance The filter instance data
|
* @param instance The filter instance data
|
||||||
* @param session The filter session
|
* @param session The filter session
|
||||||
* @param queue The query data
|
* @param queue The query data
|
||||||
|
*
|
||||||
|
* @return 1 on success, 0 on error
|
||||||
*/
|
*/
|
||||||
static int
|
static int routeQuery(FILTER *instance, void *session, GWBUF *queue)
|
||||||
routeQuery(FILTER *instance, void *session, GWBUF *queue)
|
|
||||||
{
|
{
|
||||||
DS_SESSION *my_session = (DS_SESSION *) session;
|
DS_SESSION *my_session = (DS_SESSION *) session;
|
||||||
char target[MYSQL_TABLE_MAXLEN + MYSQL_DATABASE_MAXLEN + 1];
|
char target[MYSQL_TABLE_MAXLEN + MYSQL_DATABASE_MAXLEN + 1];
|
||||||
@ -358,6 +358,9 @@ routeQuery(FILTER *instance, void *session, GWBUF *queue)
|
|||||||
return rc;
|
return rc;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Extract inserted values
|
||||||
|
*/
|
||||||
static char* get_value(char* data, uint32_t datalen, char** dest, uint32_t* destlen)
|
static char* get_value(char* data, uint32_t datalen, char** dest, uint32_t* destlen)
|
||||||
{
|
{
|
||||||
char *value_start = strnchr_esc_mysql(data, '(', datalen);
|
char *value_start = strnchr_esc_mysql(data, '(', datalen);
|
||||||
@ -378,10 +381,18 @@ static char* get_value(char* data, uint32_t datalen, char** dest, uint32_t* dest
|
|||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief Convert an INSERT query into a CSV stream
|
||||||
|
*
|
||||||
|
* @param buffer Buffer containing the query
|
||||||
|
* @param packet_num The current packet sequence number
|
||||||
|
*
|
||||||
|
* @return The modified buffer
|
||||||
|
*/
|
||||||
static GWBUF* convert_to_stream(GWBUF* buffer, uint8_t packet_num)
|
static GWBUF* convert_to_stream(GWBUF* buffer, uint8_t packet_num)
|
||||||
{
|
{
|
||||||
/** Remove the INSERT INTO ... from the buffer */
|
/** Remove the INSERT INTO ... from the buffer */
|
||||||
char *dataptr = (char*) GWBUF_DATA(buffer);
|
char *dataptr = (char*)GWBUF_DATA(buffer);
|
||||||
char *modptr = strnchr_esc_mysql(dataptr + 5, '(', GWBUF_LENGTH(buffer));
|
char *modptr = strnchr_esc_mysql(dataptr + 5, '(', GWBUF_LENGTH(buffer));
|
||||||
|
|
||||||
/** Leave some space for the header so we don't have to allocate a new one */
|
/** Leave some space for the header so we don't have to allocate a new one */
|
||||||
@ -392,10 +403,12 @@ static GWBUF* convert_to_stream(GWBUF* buffer, uint8_t packet_num)
|
|||||||
char* value;
|
char* value;
|
||||||
uint32_t valuesize;
|
uint32_t valuesize;
|
||||||
|
|
||||||
/** Remove the parentheses from the insert and recalculate the packet length */
|
/**
|
||||||
|
* Remove the parentheses from the insert, add newlines between values and
|
||||||
|
* recalculate the packet length
|
||||||
|
*/
|
||||||
while ((dataptr = get_value(dataptr, end - dataptr, &value, &valuesize)))
|
while ((dataptr = get_value(dataptr, end - dataptr, &value, &valuesize)))
|
||||||
{
|
{
|
||||||
// TODO: Don't move everything, only move the needed parts
|
|
||||||
memmove(store_end, value, valuesize);
|
memmove(store_end, value, valuesize);
|
||||||
store_end += valuesize;
|
store_end += valuesize;
|
||||||
*store_end++ = '\n';
|
*store_end++ = '\n';
|
||||||
@ -412,9 +425,20 @@ static GWBUF* convert_to_stream(GWBUF* buffer, uint8_t packet_num)
|
|||||||
return buffer;
|
return buffer;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief Handle replies from the backend
|
||||||
|
*
|
||||||
|
*
|
||||||
|
* @param instance Filter instance
|
||||||
|
* @param session Filter session
|
||||||
|
* @param reply The reply from the backend
|
||||||
|
*
|
||||||
|
* @return 1 on success, 0 on error
|
||||||
|
*/
|
||||||
static int clientReply(FILTER* instance, void *session, GWBUF *reply)
|
static int clientReply(FILTER* instance, void *session, GWBUF *reply)
|
||||||
{
|
{
|
||||||
DS_SESSION *my_session = (DS_SESSION*) session;
|
DS_SESSION *my_session = (DS_SESSION*) session;
|
||||||
|
int rc = 1;
|
||||||
|
|
||||||
if (my_session->state == DS_CLOSING_STREAM ||
|
if (my_session->state == DS_CLOSING_STREAM ||
|
||||||
(my_session->state == DS_REQUEST_SENT &&
|
(my_session->state == DS_REQUEST_SENT &&
|
||||||
@ -440,11 +464,11 @@ static int clientReply(FILTER* instance, void *session, GWBUF *reply)
|
|||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
return my_session->up.clientReply(my_session->up.instance,
|
rc = my_session->up.clientReply(my_session->up.instance,
|
||||||
my_session->up.session, reply);
|
my_session->up.session, reply);
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return rc;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -458,8 +482,7 @@ static int clientReply(FILTER* instance, void *session, GWBUF *reply)
|
|||||||
* @param fsession Filter session, may be NULL
|
* @param fsession Filter session, may be NULL
|
||||||
* @param dcb The DCB for diagnostic output
|
* @param dcb The DCB for diagnostic output
|
||||||
*/
|
*/
|
||||||
static void
|
static void diagnostic(FILTER *instance, void *fsession, DCB *dcb)
|
||||||
diagnostic(FILTER *instance, void *fsession, DCB *dcb)
|
|
||||||
{
|
{
|
||||||
DS_INSTANCE *my_instance = (DS_INSTANCE *) instance;
|
DS_INSTANCE *my_instance = (DS_INSTANCE *) instance;
|
||||||
|
|
||||||
@ -473,11 +496,23 @@ diagnostic(FILTER *instance, void *fsession, DCB *dcb)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief Get filter capabilities
|
||||||
|
*
|
||||||
|
* @return Filter capabilities
|
||||||
|
*/
|
||||||
static uint64_t getCapabilities(void)
|
static uint64_t getCapabilities(void)
|
||||||
{
|
{
|
||||||
return RCAP_TYPE_TRANSACTION_TRACKING;
|
return RCAP_TYPE_TRANSACTION_TRACKING;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief Check if an insert statement has implicitly ordered values
|
||||||
|
*
|
||||||
|
* @param buffer Buffer to check
|
||||||
|
*
|
||||||
|
* @return True if the insert does not define the order of the values
|
||||||
|
*/
|
||||||
static bool only_implicit_values(GWBUF *buffer)
|
static bool only_implicit_values(GWBUF *buffer)
|
||||||
{
|
{
|
||||||
bool rval = false;
|
bool rval = false;
|
||||||
@ -512,10 +547,12 @@ static bool only_implicit_values(GWBUF *buffer)
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Check if a buffer contains an insert statement
|
* @brief Extract insert target
|
||||||
*
|
*
|
||||||
* @param buffer Buffer to analyze
|
* @param buffer Buffer to analyze
|
||||||
* @return True if the buffer contains an insert statement
|
*
|
||||||
|
* @return True if the buffer contains an insert statement and the target table
|
||||||
|
* was successfully extracted
|
||||||
*/
|
*/
|
||||||
static bool extract_insert_target(GWBUF *buffer, char* target, int len)
|
static bool extract_insert_target(GWBUF *buffer, char* target, int len)
|
||||||
{
|
{
|
||||||
@ -547,6 +584,13 @@ static bool extract_insert_target(GWBUF *buffer, char* target, int len)
|
|||||||
return rval;
|
return rval;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief Create a LOAD DATA LOCAL INFILE statement
|
||||||
|
*
|
||||||
|
* @param target The table name where the data is loaded
|
||||||
|
*
|
||||||
|
* @return Buffer containing the statement or NULL if memory allocation failed
|
||||||
|
*/
|
||||||
static GWBUF* create_load_data_command(const char *target)
|
static GWBUF* create_load_data_command(const char *target)
|
||||||
{
|
{
|
||||||
char str[sizeof(load_data_template) + strlen(target) + 1];
|
char str[sizeof(load_data_template) + strlen(target) + 1];
|
||||||
|
Reference in New Issue
Block a user