First working implementation of the hierarchical router.

This commit is contained in:
Markus Makela
2015-01-25 21:24:22 +02:00
parent 705328012c
commit 08dd4d7053
2 changed files with 89 additions and 341 deletions

View File

@ -171,6 +171,7 @@ struct router_client_session {
int rses_versno; /*< even = no active update, else odd. not used 4/14 */
bool rses_closed; /*< true when closeSession is called */
DCB* rses_client_dcb;
DCB* dummy_dcb; /* DCB used to send the client write messages from the router itself */
MYSQL_session* rses_mysql_session;
/** Properties listed by their type */
rses_property_t* rses_properties[RSES_PROP_TYPE_COUNT];
@ -185,7 +186,6 @@ struct router_client_session {
SUBSERVICE* *subservice;
int n_subservice;
bool hash_init;
GWBUF* queue;
SESSION* session;
#if defined(SS_DEBUG)
skygw_chk_t rses_chk_tail;

View File

@ -66,6 +66,10 @@ extern __thread log_info_t tls_log_info;
static char *version_str = "V1.0.0";
static int filterReply (FILTER* instance, void *session, GWBUF *reply);
static void dummyDiagnostic(FILTER *instance, void *session, DCB *dcb)
{
return;
}
static void dummySetUpstream(FILTER *instance, void *fsession, UPSTREAM *downstream)
{
return;
@ -79,7 +83,7 @@ static FILTER_OBJECT dummyObject = {
dummySetUpstream,
NULL,
filterReply,
NULL,
dummyDiagnostic,
};
static ROUTER* createInstance(SERVICE *service, char **options);
@ -252,6 +256,7 @@ hashcmpfun(
return strcmp(i1, i2);
}
/*
static void*
hstrdup(void* fval)
@ -266,6 +271,7 @@ hfree(void* fval)
free(fval);
return NULL;
}
*/
bool
parse_mapping_response(ROUTER_CLIENT_SES* rses, char* target, GWBUF* buf)
@ -434,11 +440,13 @@ filterReply (FILTER* instance, void *session, GWBUF *reply)
SUBSERVICE* subsvc;
int i,rv = 1;
bool mapped = true;
sescmd_cursor_t* scur;
if(!rses_begin_locked_router_action(rses))
{
return 0;
}
subsvc = get_subsvc_from_ses(rses,session);
subsvc_clear_state(subsvc,SUBSVC_WAITING_RESULT|SUBSVC_QUERY_ACTIVE);
if(!rses->hash_init)
@ -453,28 +461,60 @@ filterReply (FILTER* instance, void *session, GWBUF *reply)
mapped = false;
break;
}
}
gwbuf_free(reply);
if(mapped)
{
rses->hash_init = true;
if(rses->queue){
GWBUF* tmp = rses->queue;
rses->queue = NULL;
rses_end_locked_router_action(rses);
rv = routeQuery((ROUTER*)rses->router, (void*)rses, tmp);
return rv;
}
rses->hash_init = true;
}
goto retblock;
}
scur = subsvc->scur;
if(sescmd_cursor_is_active(scur))
{
if(!sescmd_cursor_next(scur))
{
sescmd_cursor_set_active(scur,false);
}
else
{
execute_sescmd_in_backend(subsvc);
goto retblock;
}
}
rv = SESSION_ROUTE_REPLY(rses->session, reply);
retblock:
rses_end_locked_router_action(rses);
return rv;
}
/**
* This function reads the DCB's readqueue and sends it as a reply to the session
* who owns the DCB.
* @param dcb The dummy DCB
* @return 1 on success, 0 on failure
*/
int fakeRead(DCB* dcb)
{
if(dcb->dcb_readqueue)
{
GWBUF* tmp = dcb->dcb_readqueue;
dcb->dcb_readqueue = NULL;
return SESSION_ROUTE_REPLY(dcb->session, tmp);
}
return 1;
}
/**
* Implementation of the mandatory version entry point
*
@ -715,7 +755,10 @@ newSession(
client_rses->rses_autocommit_enabled = true;
client_rses->rses_transaction_active = false;
client_rses->session = session;
client_rses->dummy_dcb = dcb_alloc(DCB_ROLE_REQUEST_HANDLER);
client_rses->dummy_dcb->func.read = fakeRead;
client_rses->dummy_dcb->state = DCB_STATE_POLLING;
client_rses->dummy_dcb->session = session;
spinlock_init(&client_rses->rses_lock);
client_rses->subservice = calloc(router->n_services, sizeof(SUBSERVICE*));
@ -735,6 +778,11 @@ newSession(
goto errorblock;
}
/* TODO: add NULL value checks */
subsvc->scur = calloc(1,sizeof(sescmd_cursor_t));
subsvc->scur->scmd_cur_rses = client_rses;
subsvc->scur->scmd_cur_ptr_property = client_rses->rses_properties;
subsvc->service = router->services[i];
subsvc->dcb =dcb_clone(client_rses->rses_client_dcb);
subsvc->session = session_alloc(subsvc->service,subsvc->dcb);
@ -762,6 +810,14 @@ newSession(
*/
atomic_add(&client_rses->rses_versno, 2);
ss_dassert(client_rses->rses_versno == 2);
client_rses->dbhash = hashtable_alloc(100, hashkeyfun, hashcmpfun);
hashtable_memory_fns(client_rses->dbhash, (HASHMEMORYFN) strdup,
(HASHMEMORYFN) strdup,
(HASHMEMORYFN) free,
(HASHMEMORYFN) free);
gen_tablelist(router, client_rses);
/**
* Add this session to end of the list of active sessions in router.
*/
@ -943,262 +999,13 @@ get_shard_route_target(skygw_query_type_t qtype,
}
/**
* Check if the query is a DROP TABLE... query and
* if it targets a temporary table, remove it from the hashtable.
* @param instance Router instance
* @param router_session Router client session
* @param querybuf GWBUF containing the query
* @param type The type of the query resolved so far
* This function creates a custom SHOW DATABASES response by iterating through
* the database names in the session's hashtable. This generates a complete list
* of all available databases in all of the clusters.
* @param router The router instance
* @param client Router client session
* @return Pointer to the generated response
*/
void
check_drop_tmp_table(
ROUTER* instance,
void* router_session,
GWBUF* querybuf,
skygw_query_type_t type)
{
int tsize = 0, klen = 0, i;
char** tbl = NULL;
char *hkey, *dbname;
ROUTER_CLIENT_SES* router_cli_ses = (ROUTER_CLIENT_SES *) router_session;
rses_property_t* rses_prop_tmp;
rses_prop_tmp = router_cli_ses->rses_properties[RSES_PROP_TYPE_TMPTABLES];
dbname = router_cli_ses->rses_mysql_session->db;
if(is_drop_table_query(querybuf))
{
tbl = skygw_get_table_names(querybuf, &tsize, false);
if(tbl != NULL)
{
for(i = 0; i < tsize; i++)
{
klen = strlen(dbname) + strlen(tbl[i]) + 2;
hkey = calloc(klen, sizeof(char));
strcpy(hkey, dbname);
strcat(hkey, ".");
strcat(hkey, tbl[i]);
if(rses_prop_tmp &&
rses_prop_tmp->rses_prop_data.temp_tables)
{
if(hashtable_delete(rses_prop_tmp->rses_prop_data.temp_tables,
(void *) hkey))
{
LOGIF(LT, (skygw_log_write(LOGFILE_TRACE,
"Temporary table dropped: %s", hkey)));
}
}
free(tbl[i]);
free(hkey);
}
free(tbl);
}
}
}
/**
* Check if the query targets a temporary table.
* @param instance Router instance
* @param router_session Router client session
* @param querybuf GWBUF containing the query
* @param type The type of the query resolved so far
* @return The type of the query
*/
skygw_query_type_t
is_read_tmp_table(
ROUTER* instance,
void* router_session,
GWBUF* querybuf,
skygw_query_type_t type)
{
bool target_tmp_table = false;
int tsize = 0, klen = 0, i;
char** tbl = NULL;
char *hkey, *dbname;
ROUTER_CLIENT_SES* router_cli_ses = (ROUTER_CLIENT_SES *) router_session;
skygw_query_type_t qtype = type;
rses_property_t* rses_prop_tmp;
rses_prop_tmp = router_cli_ses->rses_properties[RSES_PROP_TYPE_TMPTABLES];
dbname = router_cli_ses->rses_mysql_session->db;
if(QUERY_IS_TYPE(qtype, QUERY_TYPE_READ) ||
QUERY_IS_TYPE(qtype, QUERY_TYPE_LOCAL_READ) ||
QUERY_IS_TYPE(qtype, QUERY_TYPE_USERVAR_READ) ||
QUERY_IS_TYPE(qtype, QUERY_TYPE_SYSVAR_READ) ||
QUERY_IS_TYPE(qtype, QUERY_TYPE_GSYSVAR_READ))
{
tbl = skygw_get_table_names(querybuf, &tsize, false);
if(tbl != NULL && tsize > 0)
{
/** Query targets at least one table */
for(i = 0; i < tsize && !target_tmp_table && tbl[i]; i++)
{
klen = strlen(dbname) + strlen(tbl[i]) + 2;
hkey = calloc(klen, sizeof(char));
strcpy(hkey, dbname);
strcat(hkey, ".");
strcat(hkey, tbl[i]);
if(rses_prop_tmp &&
rses_prop_tmp->rses_prop_data.temp_tables)
{
if((target_tmp_table =
(bool) hashtable_fetch(rses_prop_tmp->rses_prop_data.temp_tables, (void *) hkey)))
{
/**Query target is a temporary table*/
qtype = QUERY_TYPE_READ_TMP_TABLE;
LOGIF(LT,
(skygw_log_write(LOGFILE_TRACE,
"Query targets a temporary table: %s", hkey)));
}
}
free(hkey);
}
}
}
if(tbl != NULL)
{
for(i = 0; i < tsize; i++)
{
free(tbl[i]);
}
free(tbl);
}
return qtype;
}
/**
* If query is of type QUERY_TYPE_CREATE_TMP_TABLE then find out
* the database and table name, create a hashvalue and
* add it to the router client session's property. If property
* doesn't exist then create it first.
* @param instance Router instance
* @param router_session Router client session
* @param querybuf GWBUF containing the query
* @param type The type of the query resolved so far
*/
void
check_create_tmp_table(
ROUTER* instance,
void* router_session,
GWBUF* querybuf,
skygw_query_type_t type)
{
int klen = 0;
char *hkey, *dbname;
ROUTER_CLIENT_SES* router_cli_ses = (ROUTER_CLIENT_SES *) router_session;
rses_property_t* rses_prop_tmp;
HASHTABLE* h;
rses_prop_tmp = router_cli_ses->rses_properties[RSES_PROP_TYPE_TMPTABLES];
dbname = router_cli_ses->rses_mysql_session->db;
if(QUERY_IS_TYPE(type, QUERY_TYPE_CREATE_TMP_TABLE))
{
bool is_temp = true;
char* tblname = NULL;
tblname = skygw_get_created_table_name(querybuf);
if(tblname && strlen(tblname) > 0)
{
klen = strlen(dbname) + strlen(tblname) + 2;
hkey = calloc(klen, sizeof(char));
strcpy(hkey, dbname);
strcat(hkey, ".");
strcat(hkey, tblname);
}
else
{
hkey = NULL;
}
if(rses_prop_tmp == NULL)
{
if((rses_prop_tmp =
(rses_property_t*) calloc(1, sizeof(rses_property_t))))
{
#if defined(SS_DEBUG)
rses_prop_tmp->rses_prop_chk_top = CHK_NUM_ROUTER_PROPERTY;
rses_prop_tmp->rses_prop_chk_tail = CHK_NUM_ROUTER_PROPERTY;
#endif
rses_prop_tmp->rses_prop_rsession = router_cli_ses;
rses_prop_tmp->rses_prop_refcount = 1;
rses_prop_tmp->rses_prop_next = NULL;
rses_prop_tmp->rses_prop_type = RSES_PROP_TYPE_TMPTABLES;
router_cli_ses->rses_properties[RSES_PROP_TYPE_TMPTABLES] = rses_prop_tmp;
}
else
{
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR, "Error : Call to malloc() failed.")));
}
}
if(rses_prop_tmp)
{
if(rses_prop_tmp->rses_prop_data.temp_tables == NULL)
{
h = hashtable_alloc(7, hashkeyfun, hashcmpfun);
hashtable_memory_fns(h, hstrdup, NULL, hfree, NULL);
if(h != NULL)
{
rses_prop_tmp->rses_prop_data.temp_tables = h;
}
else
{
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR, "Error : Failed to allocate a new hashtable.")));
}
}
if(hkey && rses_prop_tmp->rses_prop_data.temp_tables &&
hashtable_add(rses_prop_tmp->rses_prop_data.temp_tables,
(void *) hkey,
(void *) is_temp) == 0) /*< Conflict in hash table */
{
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Temporary table conflict in hashtable: %s",
hkey)));
}
#if defined(SS_DEBUG)
{
bool retkey =
hashtable_fetch(
rses_prop_tmp->rses_prop_data.temp_tables,
hkey);
if(retkey)
{
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Temporary table added: %s",
hkey)));
}
}
#endif
}
free(hkey);
free(tblname);
}
}
GWBUF*
gen_show_dbs_response(ROUTER_INSTANCE* router, ROUTER_CLIENT_SES* client)
{
@ -1234,7 +1041,7 @@ gen_show_dbs_response(ROUTER_INSTANCE* router, ROUTER_CLIENT_SES* client)
0x00, 0x00,
0x00, 0x00};
#endif
/* The meanings of the "magic numbers" can be found in the COM_QUERY-response definition */
coldef_len = sizeof(catalog) + strlen(schema) + 1 +
strlen(table) + 1 +
strlen(org_table) + 1 +
@ -1362,20 +1169,16 @@ gen_show_dbs_response(ROUTER_INSTANCE* router, ROUTER_CLIENT_SES* client)
* If routeQuery fails, it means that router session has failed.
* In any tolerated failure, handleError is called and if necessary,
* an error message is sent to the client.
*
* For now, routeQuery don't tolerate errors, so any error will close
* the session. vraa 14.6.14
*/
static int
routeQuery(
ROUTER* instance,
routeQuery(ROUTER* instance,
void* router_session,
GWBUF* querybuf)
{
skygw_query_type_t qtype = QUERY_TYPE_UNKNOWN;
mysql_server_cmd_t packet_type;
uint8_t* packet;
int ret = 0;
int ret = 1;
SUBSERVICE* target_subsvc;
ROUTER_INSTANCE* inst = (ROUTER_INSTANCE *) instance;
ROUTER_CLIENT_SES* router_cli_ses = (ROUTER_CLIENT_SES *) router_session;
@ -1394,17 +1197,7 @@ routeQuery(
}
ss_dassert(!GWBUF_IS_TYPE_UNDEFINED(querybuf));
if(router_cli_ses->dbhash == NULL && !router_cli_ses->hash_init)
{
router_cli_ses->queue = querybuf;
router_cli_ses->dbhash = hashtable_alloc(100, hashkeyfun, hashcmpfun);
hashtable_memory_fns(router_cli_ses->dbhash, (HASHMEMORYFN) strdup,
(HASHMEMORYFN) strdup,
(HASHMEMORYFN) free,
(HASHMEMORYFN) free);
gen_tablelist(inst, router_cli_ses);
return 1;
}
packet = GWBUF_DATA(querybuf);
packet_type = packet[4];
@ -1491,60 +1284,8 @@ routeQuery(
LOGFILE_ERROR,
"Error : Changing database failed.")));
}
/* ret = 1; */
/* goto retblock; */
}
/**
* !!! Temporary tablen tutkiminen voi olla turhaa. Poista tarvittaessa.
*/
/**
* Check if the query has anything to do with temporary tables.
*/
qtype = is_read_tmp_table(instance, router_session, querybuf, qtype);
check_create_tmp_table(instance, router_session, querybuf, qtype);
check_drop_tmp_table(instance, router_session, querybuf, qtype);
/**
* !!! Transaktion tutkiminen voi olla turhaa paitsi jos haluataan
* lokittaa. Poista tarvittaessa.
*/
/**
* If autocommit is disabled or transaction is explicitly started
* transaction becomes active and master gets all statements until
* transaction is committed and autocommit is enabled again.
*/
if(router_cli_ses->rses_autocommit_enabled &&
QUERY_IS_TYPE(qtype, QUERY_TYPE_DISABLE_AUTOCOMMIT))
{
router_cli_ses->rses_autocommit_enabled = false;
if(!router_cli_ses->rses_transaction_active)
{
router_cli_ses->rses_transaction_active = true;
}
}
else if(!router_cli_ses->rses_transaction_active &&
QUERY_IS_TYPE(qtype, QUERY_TYPE_BEGIN_TRX))
{
router_cli_ses->rses_transaction_active = true;
}
/**
* Explicit COMMIT and ROLLBACK, implicit COMMIT.
*/
if(router_cli_ses->rses_autocommit_enabled &&
router_cli_ses->rses_transaction_active &&
(QUERY_IS_TYPE(qtype, QUERY_TYPE_COMMIT) ||
QUERY_IS_TYPE(qtype, QUERY_TYPE_ROLLBACK)))
{
router_cli_ses->rses_transaction_active = false;
}
else if(!router_cli_ses->rses_autocommit_enabled &&
QUERY_IS_TYPE(qtype, QUERY_TYPE_ENABLE_AUTOCOMMIT))
{
router_cli_ses->rses_autocommit_enabled = true;
router_cli_ses->rses_transaction_active = false;
}
if(LOG_IS_ENABLED(LOGFILE_TRACE))
{
@ -1583,7 +1324,10 @@ routeQuery(
/* TODO: generate a fake response from the backend */
route_target = TARGET_ANY;
GWBUF* dbres = gen_show_dbs_response(inst,router_cli_ses);
poll_add_epollin_event_to_dcb(router_cli_ses->dummy_dcb,dbres);
ret = 1;
goto retblock;
}
route_target = get_shard_route_target(qtype,
@ -1962,7 +1706,7 @@ subsvc_clear_state(SUBSERVICE* svc,subsvc_state_t state)
{
if(state == SUBSVC_WAITING_RESULT)
if(state & SUBSVC_WAITING_RESULT)
{
int prev1;
@ -2647,8 +2391,12 @@ route_session_write(
i+1 >= router_cli_ses->n_subservice ? "<" : "")));
}
scur = subsvc->scur;
scur = subsvc->scur;
/**
* Add one waiter to backend reference.
*/
@ -2958,4 +2706,4 @@ reply_error:
}
retblock:
return succp;
}
}