Reindented server/core/resultset.c

This commit is contained in:
Johan Wikman
2015-11-30 16:13:32 +02:00
parent 6c401b9085
commit 0aa38cad4c
2 changed files with 349 additions and 299 deletions

View File

@ -22,8 +22,8 @@
* @verbatim * @verbatim
* Revision History * Revision History
* *
* Date Who Description * Date Who Description
* 17/02/15 Mark Riddoch Initial implementation * 17/02/15 Mark Riddoch Initial implementation
* *
* @endverbatim * @endverbatim
*/ */
@ -43,127 +43,137 @@ static int mysql_send_row(DCB *, RESULT_ROW *, int);
/** /**
* Create a generic result set * Create a generic result set
* *
* @param func Function to call for each row * @param func Function to call for each row
* @param data Data to pass to the row retrieval function * @param data Data to pass to the row retrieval function
* @return An empty resultset or NULL on error * @return An empty resultset or NULL on error
*/ */
RESULTSET * RESULTSET *
resultset_create(RESULT_ROW_CB func, void *data) resultset_create(RESULT_ROW_CB func, void *data)
{ {
RESULTSET *rval; RESULTSET *rval;
if ((rval = (RESULTSET *)malloc(sizeof(RESULTSET))) != NULL) if ((rval = (RESULTSET *)malloc(sizeof(RESULTSET))) != NULL)
{ {
rval->n_cols = 0; rval->n_cols = 0;
rval->column = NULL; rval->column = NULL;
rval->userdata = data; rval->userdata = data;
rval->fetchrow = func; rval->fetchrow = func;
} }
return rval; return rval;
} }
/** /**
* Free a previously allocated resultset * Free a previously allocated resultset
* *
* @param resultset The result set to free * @param resultset The result set to free
*/ */
void void
resultset_free(RESULTSET *resultset) resultset_free(RESULTSET *resultset)
{ {
RESULT_COLUMN *col; RESULT_COLUMN *col;
if (resultset != NULL) if (resultset != NULL)
{ {
col = resultset->column; col = resultset->column;
while (col) while (col)
{ {
RESULT_COLUMN *next; RESULT_COLUMN *next;
next = col->next; next = col->next;
resultset_column_free(col); resultset_column_free(col);
col = next; col = next;
} }
free(resultset); free(resultset);
} }
} }
/** /**
* Add a new column to a result set. Columns are added to the right * Add a new column to a result set. Columns are added to the right
* of the result set, i.e. the existing order is maintained. * of the result set, i.e. the existing order is maintained.
* *
* @param set The result set * @param set The result set
* @param name The column name * @param name The column name
* @param len The column length * @param len The column length
* @param type The column type * @param type The column type
* @return The numebr of columns added to the result set * @return The numebr of columns added to the result set
*/ */
int int
resultset_add_column(RESULTSET *set, char *name, int len, RESULT_COL_TYPE type) resultset_add_column(RESULTSET *set, char *name, int len, RESULT_COL_TYPE type)
{ {
RESULT_COLUMN *newcol, *ptr; RESULT_COLUMN *newcol, *ptr;
if ((newcol = (RESULT_COLUMN *)malloc(sizeof(RESULT_COLUMN))) == NULL) if ((newcol = (RESULT_COLUMN *)malloc(sizeof(RESULT_COLUMN))) == NULL)
return 0; {
if ((newcol->name = strdup(name)) == NULL) return 0;
{ }
free(newcol); if ((newcol->name = strdup(name)) == NULL)
return 0; {
} free(newcol);
newcol->type = type; return 0;
newcol->len = len; }
newcol->next = NULL; newcol->type = type;
newcol->len = len;
newcol->next = NULL;
if (set->column == NULL) if (set->column == NULL)
set->column = newcol; {
else set->column = newcol;
{ }
ptr = set->column; else
while (ptr->next) {
ptr = ptr->next; ptr = set->column;
ptr->next = newcol; while (ptr->next)
} {
set->n_cols++; ptr = ptr->next;
return 1; }
ptr->next = newcol;
}
set->n_cols++;
return 1;
} }
/** /**
* Free a result set column * Free a result set column
* *
* @param col Column to free * @param col Column to free
*/ */
void void
resultset_column_free(RESULT_COLUMN *col) resultset_column_free(RESULT_COLUMN *col)
{ {
free(col->name); free(col->name);
free(col); free(col);
} }
/** /**
* Create a blank row, a row with all values NULL, for a result * Create a blank row, a row with all values NULL, for a result
* set. * set.
* *
* @param set The result set the row will be part of * @param set The result set the row will be part of
* @return The NULL result set row * @return The NULL result set row
*/ */
RESULT_ROW * RESULT_ROW *
resultset_make_row(RESULTSET *set) resultset_make_row(RESULTSET *set)
{ {
RESULT_ROW *row; RESULT_ROW *row;
int i; int i;
if ((row = (RESULT_ROW *)malloc(sizeof(RESULT_ROW))) == NULL) if ((row = (RESULT_ROW *)malloc(sizeof(RESULT_ROW))) == NULL)
return NULL; {
row->n_cols = set->n_cols; return NULL;
if ((row->cols = (char **)malloc(row->n_cols * sizeof(char *))) == NULL) }
{ row->n_cols = set->n_cols;
free(row); if ((row->cols = (char **)malloc(row->n_cols * sizeof(char *))) == NULL)
return NULL; {
} free(row);
return NULL;
}
for (i = 0; i < set->n_cols; i++) for (i = 0; i < set->n_cols; i++)
row->cols[i] = NULL; {
return row; row->cols[i] = NULL;
}
return row;
} }
/** /**
@ -172,45 +182,55 @@ int i;
* If any value is not a malloc'd pointer it should be removed before * If any value is not a malloc'd pointer it should be removed before
* making this call. * making this call.
* *
* @param row The row to free * @param row The row to free
*/ */
void void
resultset_free_row(RESULT_ROW *row) resultset_free_row(RESULT_ROW *row)
{ {
int i; int i;
for (i = 0; i < row->n_cols; i++) for (i = 0; i < row->n_cols; i++)
if (row->cols[i]) {
free(row->cols[i]); if (row->cols[i])
free(row->cols); {
free(row); free(row->cols[i]);
}
}
free(row->cols);
free(row);
} }
/** /**
* Add a value in a particular column of the row . The value is * Add a value in a particular column of the row . The value is
* a NULL terminated string and will be copied into malloc'd * a NULL terminated string and will be copied into malloc'd
* storage by this routine. * storage by this routine.
* *
* @param row The row ro add the column into * @param row The row ro add the column into
* @param col The column number (0 to n_cols - 1) * @param col The column number (0 to n_cols - 1)
* @param value The column value, may be NULL * @param value The column value, may be NULL
* @return The number of columns inserted * @return The number of columns inserted
*/ */
int int
resultset_row_set(RESULT_ROW *row, int col, char *value) resultset_row_set(RESULT_ROW *row, int col, char *value)
{ {
if (col < 0 || col >= row->n_cols) if (col < 0 || col >= row->n_cols)
return 0; {
if (value) return 0;
{ }
if ((row->cols[col] = strdup(value)) == NULL) if (value)
return 0; {
return 1; if ((row->cols[col] = strdup(value)) == NULL)
} {
else if (row->cols[col]) return 0;
free(row->cols[col]); }
row->cols[col] = NULL; return 1;
return 1; }
else if (row->cols[col])
{
free(row->cols[col]);
}
row->cols[col] = NULL;
return 1;
} }
/** /**
@ -218,140 +238,152 @@ resultset_row_set(RESULT_ROW *row, int col, char *value)
* set. Each row is retrieved by calling the function passed in the * set. Each row is retrieved by calling the function passed in the
* argument list. * argument list.
* *
* @param set The result set to stream * @param set The result set to stream
* @param dcb The connection to stream the result set to * @param dcb The connection to stream the result set to
*/ */
void void
resultset_stream_mysql(RESULTSET *set, DCB *dcb) resultset_stream_mysql(RESULTSET *set, DCB *dcb)
{ {
RESULT_COLUMN *col; RESULT_COLUMN *col;
RESULT_ROW *row; RESULT_ROW *row;
uint8_t seqno = 2; uint8_t seqno = 2;
mysql_send_fieldcount(dcb, set->n_cols); mysql_send_fieldcount(dcb, set->n_cols);
col = set->column; col = set->column;
while (col) while (col)
{ {
mysql_send_columndef(dcb, col->name, col->type, col->len, seqno++); mysql_send_columndef(dcb, col->name, col->type, col->len, seqno++);
col = col->next; col = col->next;
} }
mysql_send_eof(dcb, seqno++); mysql_send_eof(dcb, seqno++);
while ((row = (*set->fetchrow)(set, set->userdata)) != NULL) while ((row = (*set->fetchrow)(set, set->userdata)) != NULL)
{ {
mysql_send_row(dcb, row, seqno++); mysql_send_row(dcb, row, seqno++);
resultset_free_row(row); resultset_free_row(row);
} }
mysql_send_eof(dcb, seqno); mysql_send_eof(dcb, seqno);
} }
/** /**
* Send the field count packet in a response packet sequence. * Send the field count packet in a response packet sequence.
* *
* @param dcb DCB of connection to send result set to * @param dcb DCB of connection to send result set to
* @param count Number of columns in the result set * @param count Number of columns in the result set
* @return Non-zero on success * @return Non-zero on success
*/ */
static int static int
mysql_send_fieldcount(DCB *dcb, int count) mysql_send_fieldcount(DCB *dcb, int count)
{ {
GWBUF *pkt; GWBUF *pkt;
uint8_t *ptr; uint8_t *ptr;
if ((pkt = gwbuf_alloc(5)) == NULL) if ((pkt = gwbuf_alloc(5)) == NULL)
return 0; {
ptr = GWBUF_DATA(pkt); return 0;
*ptr++ = 0x01; // Payload length }
*ptr++ = 0x00; ptr = GWBUF_DATA(pkt);
*ptr++ = 0x00; *ptr++ = 0x01; // Payload length
*ptr++ = 0x01; // Sequence number in response *ptr++ = 0x00;
*ptr++ = count; // Length of result string *ptr++ = 0x00;
return dcb->func.write(dcb, pkt); *ptr++ = 0x01; // Sequence number in response
*ptr++ = count; // Length of result string
return dcb->func.write(dcb, pkt);
} }
/** /**
* Send the column definition packet in a response packet sequence. * Send the column definition packet in a response packet sequence.
* *
* @param dcb The DCB of the connection * @param dcb The DCB of the connection
* @param name Name of the column * @param name Name of the column
* @param type Column type * @param type Column type
* @param len Column length * @param len Column length
* @param seqno Packet sequence number * @param seqno Packet sequence number
* @return Non-zero on success * @return Non-zero on success
*/ */
static int static int
mysql_send_columndef(DCB *dcb, char *name, int type, int len, uint8_t seqno) mysql_send_columndef(DCB *dcb, char *name, int type, int len, uint8_t seqno)
{ {
GWBUF *pkt; GWBUF *pkt;
uint8_t *ptr; uint8_t *ptr;
int plen; int plen;
if ((pkt = gwbuf_alloc(26 + strlen(name))) == NULL) if ((pkt = gwbuf_alloc(26 + strlen(name))) == NULL)
return 0; {
ptr = GWBUF_DATA(pkt); return 0;
plen = 22 + strlen(name); }
*ptr++ = plen & 0xff; ptr = GWBUF_DATA(pkt);
*ptr++ = (plen >> 8) & 0xff; plen = 22 + strlen(name);
*ptr++ = (plen >> 16)& 0xff; *ptr++ = plen & 0xff;
*ptr++ = seqno; // Sequence number in response *ptr++ = (plen >> 8) & 0xff;
*ptr++ = 3; // Catalog is always def *ptr++ = (plen >> 16)& 0xff;
*ptr++ = 'd'; *ptr++ = seqno; // Sequence number in response
*ptr++ = 'e'; *ptr++ = 3; // Catalog is always def
*ptr++ = 'f'; *ptr++ = 'd';
*ptr++ = 0; // Schema name length *ptr++ = 'e';
*ptr++ = 0; // virtual table name length *ptr++ = 'f';
*ptr++ = 0; // Table name length *ptr++ = 0; // Schema name length
*ptr++ = strlen(name); // Column name length; *ptr++ = 0; // virtual table name length
while (*name) *ptr++ = 0; // Table name length
*ptr++ = *name++; // Copy the column name *ptr++ = strlen(name); // Column name length;
*ptr++ = 0; // Orginal column name while (*name)
*ptr++ = 0x0c; // Length of next fields always 12 {
*ptr++ = 0x3f; // Character set *ptr++ = *name++; // Copy the column name
*ptr++ = 0; }
*ptr++ = len & 0xff; // Length of column *ptr++ = 0; // Orginal column name
*ptr++ = (len >> 8) & 0xff; *ptr++ = 0x0c; // Length of next fields always 12
*ptr++ = (len >> 16) & 0xff; *ptr++ = 0x3f; // Character set
*ptr++ = (len >> 24) & 0xff; *ptr++ = 0;
*ptr++ = type; *ptr++ = len & 0xff; // Length of column
*ptr++ = 0x81; // Two bytes of flags *ptr++ = (len >> 8) & 0xff;
if (type == 0xfd) *ptr++ = (len >> 16) & 0xff;
*ptr++ = 0x1f; *ptr++ = (len >> 24) & 0xff;
else *ptr++ = type;
*ptr++ = 0x00; *ptr++ = 0x81; // Two bytes of flags
*ptr++= 0; if (type == 0xfd)
*ptr++= 0; {
*ptr++= 0; *ptr++ = 0x1f;
return dcb->func.write(dcb, pkt); }
else
{
*ptr++ = 0x00;
}
*ptr++= 0;
*ptr++= 0;
*ptr++= 0;
return dcb->func.write(dcb, pkt);
} }
/** /**
* Send an EOF packet in a response packet sequence. * Send an EOF packet in a response packet sequence.
* *
* @param dcb The client connection * @param dcb The client connection
* @param seqno The sequence number of the EOF packet * @param seqno The sequence number of the EOF packet
* @return Non-zero on success * @return Non-zero on success
*/ */
static int static int
mysql_send_eof(DCB *dcb, int seqno) mysql_send_eof(DCB *dcb, int seqno)
{ {
GWBUF *pkt; GWBUF *pkt;
uint8_t *ptr; uint8_t *ptr;
if ((pkt = gwbuf_alloc(9)) == NULL) if ((pkt = gwbuf_alloc(9)) == NULL)
return 0; {
ptr = GWBUF_DATA(pkt); return 0;
*ptr++ = 0x05; }
*ptr++ = 0x00; ptr = GWBUF_DATA(pkt);
*ptr++ = 0x00; *ptr++ = 0x05;
*ptr++ = seqno; // Sequence number in response *ptr++ = 0x00;
*ptr++ = 0xfe; // Length of result string *ptr++ = 0x00;
*ptr++ = 0x00; // No Errors *ptr++ = seqno; // Sequence number in response
*ptr++ = 0x00; *ptr++ = 0xfe; // Length of result string
*ptr++ = 0x02; // Autocommit enabled *ptr++ = 0x00; // No Errors
*ptr++ = 0x00; *ptr++ = 0x00;
return dcb->func.write(dcb, pkt); *ptr++ = 0x02; // Autocommit enabled
*ptr++ = 0x00;
return dcb->func.write(dcb, pkt);
} }
@ -359,67 +391,73 @@ uint8_t *ptr;
/** /**
* Send a row packet in a response packet sequence. * Send a row packet in a response packet sequence.
* *
* @param dcb The client connection * @param dcb The client connection
* @param row The row to send * @param row The row to send
* @param seqno The sequence number of the EOF packet * @param seqno The sequence number of the EOF packet
* @return Non-zero on success * @return Non-zero on success
*/ */
static int static int
mysql_send_row(DCB *dcb, RESULT_ROW *row, int seqno) mysql_send_row(DCB *dcb, RESULT_ROW *row, int seqno)
{ {
GWBUF *pkt; GWBUF *pkt;
int i, len = 4; int i, len = 4;
uint8_t *ptr; uint8_t *ptr;
for (i = 0; i < row->n_cols; i++) for (i = 0; i < row->n_cols; i++)
{ {
if (row->cols[i]) if (row->cols[i])
len += strlen(row->cols[i]); {
len++; len += strlen(row->cols[i]);
} }
len++;
}
if ((pkt = gwbuf_alloc(len)) == NULL) if ((pkt = gwbuf_alloc(len)) == NULL)
return 0; {
ptr = GWBUF_DATA(pkt); return 0;
len -= 4; }
*ptr++ = len & 0xff; ptr = GWBUF_DATA(pkt);
*ptr++ = (len >> 8) & 0xff; len -= 4;
*ptr++ = (len >> 16) & 0xff; *ptr++ = len & 0xff;
*ptr++ = seqno; *ptr++ = (len >> 8) & 0xff;
for (i = 0; i < row->n_cols; i++) *ptr++ = (len >> 16) & 0xff;
{ *ptr++ = seqno;
if (row->cols[i]) for (i = 0; i < row->n_cols; i++)
{ {
len = strlen(row->cols[i]); if (row->cols[i])
*ptr++ = len; {
strncpy((char *)ptr, row->cols[i], len); len = strlen(row->cols[i]);
ptr += len; *ptr++ = len;
} strncpy((char *)ptr, row->cols[i], len);
else ptr += len;
{ }
*ptr++ = 0; // NULL column else
} {
} *ptr++ = 0; // NULL column
}
}
return dcb->func.write(dcb, pkt); return dcb->func.write(dcb, pkt);
} }
/** /**
* Return true if the string only contains numerics * Return true if the string only contains numerics
* *
* @param value String to test * @param value String to test
* @return Non-zero if the string is made of of numeric values * @return Non-zero if the string is made of of numeric values
*/ */
static int static int
value_is_numeric(char *value) value_is_numeric(char *value)
{ {
while (*value) while (*value)
{ {
if (!isdigit(*value)) if (!isdigit(*value))
return 0; {
value++; return 0;
} }
return 1; value++;
}
return 1;
} }
/** /**
@ -427,42 +465,50 @@ value_is_numeric(char *value)
* Each row is retrieved by calling the function passed in the * Each row is retrieved by calling the function passed in the
* argument list. * argument list.
* *
* @param set The result set to stream * @param set The result set to stream
* @param dcb The connection to stream the result set to * @param dcb The connection to stream the result set to
*/ */
void void
resultset_stream_json(RESULTSET *set, DCB *dcb) resultset_stream_json(RESULTSET *set, DCB *dcb)
{ {
RESULT_COLUMN *col; RESULT_COLUMN *col;
RESULT_ROW *row; RESULT_ROW *row;
int rowno = 0; int rowno = 0;
dcb_printf(dcb, "[ ");
dcb_printf(dcb, "[ "); while ((row = (*set->fetchrow)(set, set->userdata)) != NULL)
while ((row = (*set->fetchrow)(set, set->userdata)) != NULL) {
{ int i = 0;
int i = 0; if (rowno++ > 0)
if (rowno++ > 0) {
dcb_printf(dcb, ",\n"); dcb_printf(dcb, ",\n");
dcb_printf(dcb, "{ "); }
col = set->column; dcb_printf(dcb, "{ ");
while (col) col = set->column;
{ while (col)
{
dcb_printf(dcb, "\"%s\" : ", col->name); dcb_printf(dcb, "\"%s\" : ", col->name);
if (row->cols[i] && value_is_numeric(row->cols[i])) if (row->cols[i] && value_is_numeric(row->cols[i]))
dcb_printf(dcb, "%s", row->cols[i]); {
else if (row->cols[i]) dcb_printf(dcb, "%s", row->cols[i]);
dcb_printf(dcb, "\"%s\"", row->cols[i]); }
else else if (row->cols[i])
dcb_printf(dcb, "NULL"); {
i++; dcb_printf(dcb, "\"%s\"", row->cols[i]);
col = col->next; }
if (col) else
dcb_printf(dcb, ", "); {
} dcb_printf(dcb, "NULL");
resultset_free_row(row); }
dcb_printf(dcb, "}"); i++;
} col = col->next;
dcb_printf(dcb, "]\n"); if (col)
{
dcb_printf(dcb, ", ");
}
}
resultset_free_row(row);
dcb_printf(dcb, "}");
}
dcb_printf(dcb, "]\n");
} }

View File

@ -24,8 +24,8 @@
* @verbatim * @verbatim
* Revision History * Revision History
* *
* Date Who Description * Date Who Description
* 17/02/15 Mark Riddoch Initial implementation * 17/02/15 Mark Riddoch Initial implementation
* *
* @endverbatim * @endverbatim
*/ */
@ -35,28 +35,31 @@
/** /**
* Column types * Column types
*/ */
typedef enum { typedef enum
COL_TYPE_VARCHAR = 0x0f, {
COL_TYPE_VARSTRING = 0xfd COL_TYPE_VARCHAR = 0x0f,
COL_TYPE_VARSTRING = 0xfd
} RESULT_COL_TYPE; } RESULT_COL_TYPE;
/** /**
* The result set column definition. Each result set has an order linked * The result set column definition. Each result set has an order linked
* list of column definitions. * list of column definitions.
*/ */
typedef struct resultcolumn { typedef struct resultcolumn
char *name; /*< Column name */ {
int len; /*< Column length */ char *name; /*< Column name */
RESULT_COL_TYPE type; /*< Column type */ int len; /*< Column length */
struct resultcolumn *next; /*< next column */ RESULT_COL_TYPE type; /*< Column type */
struct resultcolumn *next; /*< next column */
} RESULT_COLUMN; } RESULT_COLUMN;
/** /**
* A representation of a row within a result set. * A representation of a row within a result set.
*/ */
typedef struct resultrow { typedef struct resultrow
int n_cols; /*< No. of columns in row */ {
char **cols; /*< The columns themselves */ int n_cols; /*< No. of columns in row */
char **cols; /*< The columns themselves */
} RESULT_ROW; } RESULT_ROW;
struct resultset; struct resultset;
@ -70,19 +73,20 @@ typedef RESULT_ROW * (*RESULT_ROW_CB)(struct resultset *, void *);
* The representation of the result set itself. * The representation of the result set itself.
*/ */
typedef struct resultset { typedef struct resultset {
int n_cols; /*< No. of columns */ int n_cols; /*< No. of columns */
RESULT_COLUMN *column; /*< Linked list of column definitions */ RESULT_COLUMN *column; /*< Linked list of column definitions */
RESULT_ROW_CB fetchrow; /*< Fetch a row for the result set */ RESULT_ROW_CB fetchrow; /*< Fetch a row for the result set */
void *userdata; /*< User data for the fetch row call */ void *userdata; /*< User data for the fetch row call */
} RESULTSET; } RESULTSET;
extern RESULTSET *resultset_create(RESULT_ROW_CB, void *); extern RESULTSET *resultset_create(RESULT_ROW_CB, void *);
extern void resultset_free(RESULTSET *); extern void resultset_free(RESULTSET *);
extern int resultset_add_column(RESULTSET *, char *, int, RESULT_COL_TYPE); extern int resultset_add_column(RESULTSET *, char *, int, RESULT_COL_TYPE);
extern void resultset_column_free(RESULT_COLUMN *); extern void resultset_column_free(RESULT_COLUMN *);
extern RESULT_ROW *resultset_make_row(RESULTSET *); extern RESULT_ROW *resultset_make_row(RESULTSET *);
extern void resultset_free_row(RESULT_ROW *); extern void resultset_free_row(RESULT_ROW *);
extern int resultset_row_set(RESULT_ROW *, int, char *); extern int resultset_row_set(RESULT_ROW *, int, char *);
extern void resultset_stream_mysql(RESULTSET *, DCB *); extern void resultset_stream_mysql(RESULTSET *, DCB *);
extern void resultset_stream_json(RESULTSET *, DCB *); extern void resultset_stream_json(RESULTSET *, DCB *);
#endif #endif