Refactored schemarouter to use the resultset.h data types and functions

The generated responses to SHOW DATABASES are now generated by using the resultset.h
data types and functions.
This commit is contained in:
Markus Makela
2015-11-08 16:19:46 +02:00
parent b8fda016c9
commit 1f15843d61

View File

@ -1818,7 +1818,40 @@ void check_create_tmp_table(
int cmpfn(const void* a, const void *b)
{
return strcmp(*(char**)a,*(char**)b);
return strcmp(*(char* const *)a,*(char* const *)b);
}
/** Internal structure used to stream the list of databases */
struct string_array
{
char** array;
int position;
int size;
};
/**
* Callback for the database list streaming.
* @param rset Result set which is being processed
* @param data Pointer to struct string_array containing the database names
* @return New resultset row or NULL if no more data is available. If memory allocation
* failed, NULL is returned.
*/
RESULT_ROW *result_set_cb(struct resultset * rset, void *data)
{
RESULT_ROW *row = NULL;
struct string_array *strarray = (struct string_array*) data;
ss_dassert(strarray->position < strarray->size);
if ((row = resultset_make_row(rset)))
{
if (resultset_row_set(row, 0, strarray->array[strarray->position++]) == 0)
{
resultset_free_row(row);
row = NULL;
}
}
return row;
}
/**
@ -1827,190 +1860,48 @@ int cmpfn(const void* a, const void *b)
* in it.
* @param router Router instance
* @param client Router client session
* @return Pointer to new GWBUF containing the custom result set
* @return True if the sending of the database list was successful, otherwise false
*/
GWBUF*
gen_show_dbs_response(ROUTER_INSTANCE* router, ROUTER_CLIENT_SES* client)
bool send_database_list(ROUTER_INSTANCE* router, ROUTER_CLIENT_SES* client)
{
GWBUF* rval = NULL;
backend_ref_t *bref = client->rses_backend_ref;
BACKEND** backends = router->servers;
unsigned int coldef_len = 0;
int i;
char dbname[MYSQL_DATABASE_MAXLEN + 1];
char *value;
unsigned char* ptr;
char catalog[4] = {0x03, 'd', 'e', 'f'};
const char* schema = "information_schema";
const char* table = "SCHEMATA";
const char* org_table = "SCHEMATA";
const char* name = "Database";
const char* org_name = "SCHEMA_NAME";
char next_length = 0x0c;
char charset[2] = {0x21, 0x00};
char column_length[4] = {MYSQL_DATABASE_MAXLEN,
MYSQL_DATABASE_MAXLEN >> 8,
MYSQL_DATABASE_MAXLEN >> 16,
MYSQL_DATABASE_MAXLEN >> 24};
char column_type = 0xfd;
char eof[9] = {0x05, 0x00, 0x00,
0x03, 0xfe, 0x00,
0x00, 0x22, 0x00};
#if defined(NOT_USED)
char ok_packet[11] = {0x07, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00,
0x00, 0x00,
0x00, 0x00};
#endif
coldef_len = sizeof(catalog) + strlen(schema) + 1 +
strlen(table) + 1 +
strlen(org_table) + 1 +
strlen(name) + 1 +
strlen(org_name) + 1 +
1 + 2 + 4 + 1 + 2 + 1 + 2;
rval = gwbuf_alloc(5 + 4 + coldef_len + sizeof(eof));
ptr = rval->start;
/**First packet*/
*ptr++ = 0x01;
*ptr++ = 0x00;
*ptr++ = 0x00;
*ptr++ = 0x01;
*ptr++ = 0x01;
/**Second packet containing the column definitions*/
*ptr++ = coldef_len;
*ptr++ = coldef_len >> 8;
*ptr++ = coldef_len >> 16;
*ptr++ = 0x02;
memcpy((void*) ptr, catalog, 4);
ptr += 4;
*ptr++ = strlen(schema);
memcpy((void*) ptr, schema, strlen(schema));
ptr += strlen(schema);
*ptr++ = strlen(table);
memcpy((void*) ptr, table, strlen(table));
ptr += strlen(table);
*ptr++ = strlen(org_table);
memcpy((void*) ptr, org_table, strlen(org_table));
ptr += strlen(org_table);
*ptr++ = strlen(name);
memcpy((void*) ptr, name, strlen(name));
ptr += strlen(name);
*ptr++ = strlen(org_name);
memcpy((void*) ptr, org_name, strlen(org_name));
ptr += strlen(org_name);
*ptr++ = next_length;
*ptr++ = charset[0];
*ptr++ = charset[1];
*ptr++ = column_length[0];
*ptr++ = column_length[1];
*ptr++ = column_length[2];
*ptr++ = column_length[3];
*ptr++ = column_type;
*ptr++ = 0x01;
memset(ptr, 0, 4);
ptr += 4;
memcpy(ptr, eof, sizeof(eof));
unsigned int packet_num = 4;
int j = 0,ndbs = 0, bufsz = 10;
char** dbs;
if((dbs = malloc(sizeof(char*) * bufsz)) == NULL)
{
gwbuf_free(rval);
return NULL;
}
bool rval = false;
spinlock_acquire(&client->shardmap->lock);
if (client->shardmap->state == SHMAP_READY)
{
HASHTABLE* ht = client->shardmap->hash;
HASHITERATOR* iter = hashtable_iterator(ht);
struct string_array strarray;
const int size = hashtable_size(client->shardmap->hash);
strarray.array = malloc(size * sizeof(char*));
strarray.position = 0;
HASHITERATOR *iter = hashtable_iterator(client->shardmap->hash);
RESULTSET* resultset = resultset_create(result_set_cb, &strarray);
while((value = (char*) hashtable_next(iter)))
if (strarray.array && iter && resultset)
{
char* bend = hashtable_fetch(ht, value);
for(i = 0; backends[i]; i++)
char *key;
int i = 0;
while ((key = hashtable_next(iter)))
{
if(strcmp(bref[i].bref_backend->backend_server->unique_name, bend) == 0 &&
BREF_IS_IN_USE(&bref[i]) && !BREF_IS_CLOSED(&bref[i]))
char *value = hashtable_fetch(client->shardmap->hash, key);
SERVER * server = server_find_by_unique_name(value);
if (SERVER_IS_RUNNING(server))
{
ndbs++;
if(ndbs >= bufsz)
strarray.array[i++] = key;
}
}
strarray.size = i;
qsort(strarray.array, strarray.size, sizeof(char*), cmpfn);
if (resultset_add_column(resultset, "Database", MYSQL_DATABASE_MAXLEN,
COL_TYPE_VARCHAR))
{
bufsz += bufsz / 2;
char** tmp = realloc(dbs,sizeof(char*) * bufsz);
if(tmp == NULL)
{
gwbuf_free(rval);
resultset_stream_mysql(resultset, client->rses_client_dcb);
rval = true;
}
}
resultset_free(resultset);
hashtable_iterator_free(iter);
for (i = 0; i < ndbs - 1; i++)
{
free(dbs[i]);
free(strarray.array);
}
free(dbs);
spinlock_release(&client->shardmap->lock);
return NULL;
}
dbs = tmp;
}
dbs[j++] = strdup(value);
}
}
}
hashtable_iterator_free(iter);
}
spinlock_release(&client->shardmap->lock);
qsort(&dbs[0],(size_t)ndbs,sizeof(char*),cmpfn);
for(j = 0;j<ndbs;j++)
{
GWBUF* temp;
int plen = strlen(dbs[j]) + 1;
sprintf(dbname, "%s", dbs[j]);
temp = gwbuf_alloc(plen + 4);
ptr = temp->start;
*ptr++ = plen;
*ptr++ = plen >> 8;
*ptr++ = plen >> 16;
*ptr++ = packet_num++;
*ptr++ = plen - 1;
memcpy(ptr, dbname, plen - 1);
/** Append the row*/
rval = gwbuf_append(rval, temp);
free(dbs[j]);
}
eof[3] = packet_num;
GWBUF* last_packet = gwbuf_alloc(sizeof(eof));
memcpy(last_packet->start, eof, sizeof(eof));
rval = gwbuf_append(rval, last_packet);
free(dbs);
return rval;
}
@ -2306,16 +2197,13 @@ static int routeQuery(
}
}
/** Create the response to the SHOW DATABASES from the mapped databases */
if (QUERY_IS_TYPE(qtype, QUERY_TYPE_SHOW_DATABASES))
{
/**
* Generate custom response that contains all the databases
*/
GWBUF* fake = gen_show_dbs_response(inst,router_cli_ses);
poll_add_epollin_event_to_dcb(router_cli_ses->dcb_reply,fake);
if (send_database_list(inst, router_cli_ses))
{
ret = 1;
}
goto retblock;
}