From a10aa857367f2d339e9ef242f7a986178eb90902 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20M=C3=A4kel=C3=A4?= Date: Mon, 27 Mar 2017 12:01:27 +0300 Subject: [PATCH] Clean up routeQuery and clientReply Moved parts of the functionality into subfunctions. Reordered code to remove redundant logic. --- .../schemarouter/schemaroutersession.cc | 767 +++++++----------- .../schemarouter/schemaroutersession.hh | 12 +- 2 files changed, 318 insertions(+), 461 deletions(-) diff --git a/server/modules/routing/schemarouter/schemaroutersession.cc b/server/modules/routing/schemarouter/schemaroutersession.cc index 86e595e12..ab2c4d67d 100644 --- a/server/modules/routing/schemarouter/schemaroutersession.cc +++ b/server/modules/routing/schemarouter/schemaroutersession.cc @@ -68,7 +68,6 @@ SchemaRouterSession::SchemaRouterSession(MXS_SESSION* session, SchemaRouter& rou this->m_router = router; this->m_client = (DCB*)session->client_dcb; - this->m_queue = NULL; this->m_closed = false; this->m_sent_sescmd = 0; this->m_replied_sescmd = 0; @@ -192,8 +191,6 @@ void SchemaRouterSession::close() } } - gwbuf_free(this->m_queue); - spinlock_acquire(&m_router.m_lock); if (m_router.m_stats.longest_sescmd < this->m_stats.longest_sescmd) { @@ -217,134 +214,53 @@ void SchemaRouterSession::close() } } -int32_t SchemaRouterSession::routeQuery(GWBUF* pPacket) +static void inspect_query(GWBUF* pPacket, uint32_t* type, qc_query_op_t* op, uint8_t* command) { - uint32_t qtype = QUERY_TYPE_UNKNOWN; - uint8_t packet_type; - uint8_t* packet; - int ret = 0; - DCB* target_dcb = NULL; - bool change_successful = false; - route_target_t route_target = TARGET_UNDEFINED; - bool succp = false; - char db[MYSQL_DATABASE_MAXLEN + 1]; - char errbuf[26 + MYSQL_DATABASE_MAXLEN]; + uint8_t* data = GWBUF_DATA(pPacket); + *command = data[4]; - SERVER* target = NULL; - - ss_dassert(!GWBUF_IS_TYPE_UNDEFINED(pPacket)); - - if (this->m_closed) + switch (*command) { - return 0; - } - - if (this->m_shard.empty()) - { - /* Generate database list */ - gen_databaselist(); - } - - /** - * If the databases are still being mapped or if the client connected - * with a default database but no database mapping was performed we need - * to store the query. Once the databases have been mapped and/or the - * default database is taken into use we can send the query forward. - */ - if (this->m_state & (INIT_MAPPING | INIT_USE_DB)) - { - int init_rval = 1; - char* querystr = modutil_get_SQL(pPacket); - MXS_INFO("Storing query for session %p: %s", - this->m_client->session, - querystr); - MXS_FREE(querystr); - pPacket = gwbuf_make_contiguous(pPacket); - GWBUF* ptr = this->m_queue; - - while (ptr && ptr->next) - { - ptr = ptr->next; - } - - if (ptr == NULL) - { - this->m_queue = pPacket; - } - else - { - ptr->next = pPacket; - - } - - if (this->m_state == (INIT_READY | INIT_USE_DB)) - { - /** - * This state is possible if a client connects with a default database - * and the shard map was found from the router cache - */ - if (!handle_default_db()) - { - init_rval = 0; - } - } - - return init_rval; - } - - packet = GWBUF_DATA(pPacket); - packet_type = packet[4]; - qc_query_op_t op = QUERY_OP_UNDEFINED; - - if (detect_show_shards(pPacket)) - { - process_show_shards(); - gwbuf_free(pPacket); - return 1; - } - - switch (packet_type) - { - case MYSQL_COM_QUIT: /*< 1 QUIT will close all sessions */ - case MYSQL_COM_INIT_DB: /*< 2 DDL must go to the master */ - case MYSQL_COM_REFRESH: /*< 7 - I guess this is session but not sure */ - case MYSQL_COM_DEBUG: /*< 0d all servers dump debug info to stdout */ - case MYSQL_COM_PING: /*< 0e all servers are pinged */ + case MYSQL_COM_QUIT: /*< 1 QUIT will close all sessions */ + case MYSQL_COM_INIT_DB: /*< 2 DDL must go to the master */ + case MYSQL_COM_REFRESH: /*< 7 - I guess this is session but not sure */ + case MYSQL_COM_DEBUG: /*< 0d all servers dump debug info to stdout */ + case MYSQL_COM_PING: /*< 0e all servers are pinged */ case MYSQL_COM_CHANGE_USER: /*< 11 all servers change it accordingly */ - case MYSQL_COM_STMT_CLOSE: /*< free prepared statement */ + case MYSQL_COM_STMT_CLOSE: /*< free prepared statement */ case MYSQL_COM_STMT_SEND_LONG_DATA: /*< send data to column */ - case MYSQL_COM_STMT_RESET: /*< resets the data of a prepared statement */ - qtype = QUERY_TYPE_SESSION_WRITE; + case MYSQL_COM_STMT_RESET: /*< resets the data of a prepared statement */ + *type = QUERY_TYPE_SESSION_WRITE; break; - case MYSQL_COM_CREATE_DB: /**< 5 DDL must go to the master */ - case MYSQL_COM_DROP_DB: /**< 6 DDL must go to the master */ - qtype = QUERY_TYPE_WRITE; + case MYSQL_COM_CREATE_DB: /**< 5 DDL must go to the master */ + case MYSQL_COM_DROP_DB: /**< 6 DDL must go to the master */ + *type = QUERY_TYPE_WRITE; break; case MYSQL_COM_QUERY: - qtype = qc_get_type_mask(pPacket); - op = qc_get_operation(pPacket); + *type = qc_get_type_mask(pPacket); + *op = qc_get_operation(pPacket); break; case MYSQL_COM_STMT_PREPARE: - qtype = qc_get_type_mask(pPacket); - qtype |= QUERY_TYPE_PREPARE_STMT; + *type = qc_get_type_mask(pPacket); + *type |= QUERY_TYPE_PREPARE_STMT; break; case MYSQL_COM_STMT_EXECUTE: /** Parsing is not needed for this type of packet */ - qtype = QUERY_TYPE_EXEC_STMT; + *type = QUERY_TYPE_EXEC_STMT; break; - case MYSQL_COM_SHUTDOWN: /**< 8 where should shutdown be routed ? */ - case MYSQL_COM_STATISTICS: /**< 9 ? */ - case MYSQL_COM_PROCESS_INFO: /**< 0a ? */ - case MYSQL_COM_CONNECT: /**< 0b ? */ - case MYSQL_COM_PROCESS_KILL: /**< 0c ? */ - case MYSQL_COM_TIME: /**< 0f should this be run in gateway ? */ + case MYSQL_COM_SHUTDOWN: /**< 8 where should shutdown be routed ? */ + case MYSQL_COM_STATISTICS: /**< 9 ? */ + case MYSQL_COM_PROCESS_INFO: /**< 0a ? */ + case MYSQL_COM_CONNECT: /**< 0b ? */ + case MYSQL_COM_PROCESS_KILL: /**< 0c ? */ + case MYSQL_COM_TIME: /**< 0f should this be run in gateway ? */ case MYSQL_COM_DELAYED_INSERT: /**< 10 ? */ - case MYSQL_COM_DAEMON: /**< 1d ? */ + case MYSQL_COM_DAEMON: /**< 1d ? */ default: break; } @@ -353,167 +269,50 @@ int32_t SchemaRouterSession::routeQuery(GWBUF* pPacket) { char *sql; int sql_len; - char* qtypestr = qc_typemask_to_string(qtype); + char* qtypestr = qc_typemask_to_string(*type); modutil_extract_SQL(pPacket, &sql, &sql_len); MXS_INFO("> Command: %s, stmt: %.*s %s%s", - STRPACKETTYPE(packet_type), sql_len, sql, + STRPACKETTYPE(*command), sql_len, sql, (pPacket->hint == NULL ? "" : ", Hint:"), (pPacket->hint == NULL ? "" : STRHINTTYPE(pPacket->hint->type))); MXS_FREE(qtypestr); } - /** - * Find out whether the query should be routed to single server or to - * all of them. - */ +} - if (packet_type == MYSQL_COM_INIT_DB || op == QUERY_OP_CHANGE_DB) +SERVER* SchemaRouterSession::resolve_query_target(GWBUF* pPacket, + uint32_t type, + uint8_t command, + route_target_t& route_target) +{ + SERVER* target = NULL; + + if (route_target != TARGET_NAMED_SERVER) { - change_successful = change_current_db(this->m_current_db, - this->m_shard, - pPacket); - if (!change_successful) + /** We either don't know or don't care where this query should go */ + target = get_shard_target(pPacket, type); + + if (target && SERVER_IS_RUNNING(target)) { - extract_database(pPacket, db); - snprintf(errbuf, 25 + MYSQL_DATABASE_MAXLEN, "Unknown database: %s", db); - - if (this->m_config.debug) - { - sprintf(errbuf + strlen(errbuf), - " ([%lu]: DB change failed)", - this->m_client->session->ses_id); - } - - write_error_to_client(this->m_client, - SCHEMA_ERR_DBNOTFOUND, - SCHEMA_ERRSTR_DBNOTFOUND, - errbuf); - - MXS_ERROR("Changing database failed."); - gwbuf_free(pPacket); - return 1; - } - } - - /** Create the response to the SHOW DATABASES from the mapped databases */ - if (qc_query_is_type(qtype, QUERY_TYPE_SHOW_DATABASES)) - { - if (send_database_list()) - { - ret = 1; - } - - gwbuf_free(pPacket); - return ret; - } - - route_target = get_shard_route_target(qtype); - - if (packet_type == MYSQL_COM_INIT_DB || op == QUERY_OP_CHANGE_DB) - { - route_target = TARGET_UNDEFINED; - target = this->m_shard.get_location(this->m_current_db); - - if (target) - { - MXS_INFO("INIT_DB for database '%s' on server '%s'", - this->m_current_db.c_str(), target->unique_name); route_target = TARGET_NAMED_SERVER; } - else - { - MXS_INFO("INIT_DB with unknown database"); - } - } - else if (route_target != TARGET_ALL) - { - /** If no database is found in the query and there is no active database - * or hints in the query we route the query to the first available - * server. This isn't ideal for monitoring server status but works if - * we just want the server to send an error back. */ - - target = get_shard_target(pPacket, qtype); - - if (target) - { - if (SERVER_IS_RUNNING(target)) - { - route_target = TARGET_NAMED_SERVER; - } - else - { - MXS_INFO("Backend server '%s' is not in a viable state", target->unique_name); - - /** - * Shard is not a viable target right now so we check - * for an alternate backend with the database. If this is not found - * the target is undefined and an error will be returned to the client. - */ - } - } } if (TARGET_IS_UNDEFINED(route_target)) { - target = get_shard_target(pPacket, qtype); + /** We don't know where to send this. Route it to either the server with + * the current default database or to the first available server. */ + target = get_shard_target(pPacket, type); - if ((target == NULL && - packet_type != MYSQL_COM_INIT_DB && - this->m_current_db.length() == 0) || - packet_type == MYSQL_COM_FIELD_LIST || - (this->m_current_db.length() == 0)) + if ((target == NULL && command != MYSQL_COM_INIT_DB && this->m_current_db.length() == 0) || + command == MYSQL_COM_FIELD_LIST || + this->m_current_db.length() == 0) { - /** - * No current database and no databases in query or - * the database is ignored, route to first available backend. - */ - + /** No current database and no databases in query or the database is + * ignored, route to first available backend. */ route_target = TARGET_ANY; - MXS_INFO("Routing query to first available backend."); - } - else - { - if (!change_successful) - { - /** - * Bad shard status. The changing of the database - * was not successful and the error message was already sent. - */ - - ret = 1; - } - else - { - MXS_ERROR("Error : Router internal failure (schemarouter)"); - /** Something else went wrong, terminate connection */ - ret = 0; - } - - gwbuf_free(pPacket); - return ret; - } - } - - if (TARGET_IS_ALL(route_target)) - { - /** - * It is not sure if the session command in question requires - * response. Statement is examined in route_session_write. - * Router locking is done inside the function. - */ - succp = route_session_write(pPacket, packet_type); - - if (succp) - { - atomic_add(&m_router.m_stats.n_sescmd, 1); - atomic_add(&m_router.m_stats.n_queries, 1); - ret = 1; - } - - gwbuf_free(pPacket); - return ret; } if (TARGET_IS_ANY(route_target)) @@ -533,54 +332,166 @@ int32_t SchemaRouterSession::routeQuery(GWBUF* pPacket) { /**No valid backends alive*/ MXS_ERROR("Failed to route query, no backends are available."); - gwbuf_free(pPacket); - return 0; } - } + return target; +} + +int32_t SchemaRouterSession::routeQuery(GWBUF* pPacket) +{ + ss_dassert(!GWBUF_IS_TYPE_UNDEFINED(pPacket)); + + if (this->m_closed) + { + return 0; + } + + if (this->m_shard.empty()) + { + /* Generate database list */ + gen_databaselist(); + } + + int ret = 0; + /** - * Query is routed to one of the backends + * If the databases are still being mapped or if the client connected + * with a default database but no database mapping was performed we need + * to store the query. Once the databases have been mapped and/or the + * default database is taken into use we can send the query forward. */ - if (TARGET_IS_NAMED_SERVER(route_target) && target) + if (this->m_state & (INIT_MAPPING | INIT_USE_DB)) { - /** - * Search backend server by name or replication lag. - * If it fails, then try to find valid slave or master. - */ + this->m_queue.push_back(pPacket); - succp = get_shard_dcb(&target_dcb, target->unique_name); - - if (!succp) + if (this->m_state == (INIT_READY | INIT_USE_DB)) { - MXS_INFO("Was supposed to route to named server " - "%s but couldn't find the server in a " - "suitable state.", target->unique_name); + /** + * This state is possible if a client connects with a default database + * and the shard map was found from the router cache + */ + if (handle_default_db()) + { + ret = 1; + } } + return ret; } - if (succp) /*< Have DCB of the target backend */ - { - backend_ref_t *bref = get_bref_from_dcb(target_dcb); + uint8_t command = 0; + SERVER* target = NULL; + uint32_t type = QUERY_TYPE_UNKNOWN; + qc_query_op_t op = QUERY_OP_UNDEFINED; + route_target_t route_target = TARGET_UNDEFINED; - MXS_INFO("Route query to \t%s:%d <", - bref->backend->server->name, - bref->backend->server->port); - /** - * Store current stmt if execution of previous session command - * haven't completed yet. Note that according to MySQL protocol - * there can only be one such non-sescmd stmt at the time. - */ - if (bref->session_commands.size() > 0) + inspect_query(pPacket, &type, &op, &command); + + /** Create the response to the SHOW DATABASES from the mapped databases */ + if (qc_query_is_type(type, QUERY_TYPE_SHOW_DATABASES)) + { + if (send_database_list()) { - ss_dassert((bref->pending_cmd == NULL || - this->m_closed)); - bref->pending_cmd = pPacket; + ret = 1; + } + + gwbuf_free(pPacket); + return ret; + } + else if (detect_show_shards(pPacket)) + { + if (process_show_shards()) + { + ret = 1; + } + gwbuf_free(pPacket); + return ret; + } + + /** The default database changes must be routed to a specific server */ + if (command == MYSQL_COM_INIT_DB || op == QUERY_OP_CHANGE_DB) + { + if (!change_current_db(m_current_db, m_shard, pPacket)) + { + char db[MYSQL_DATABASE_MAXLEN + 1]; + extract_database(pPacket, db); + gwbuf_free(pPacket); + + char errbuf[128 + MYSQL_DATABASE_MAXLEN]; + snprintf(errbuf, sizeof(errbuf), "Unknown database: %s", db); + + if (this->m_config.debug) + { + sprintf(errbuf + strlen(errbuf), + " ([%lu]: DB change failed)", + this->m_client->session->ses_id); + } + + write_error_to_client(this->m_client, + SCHEMA_ERR_DBNOTFOUND, + SCHEMA_ERRSTR_DBNOTFOUND, + errbuf); return 1; } - if ((ret = target_dcb->func.write(target_dcb, gwbuf_clone(pPacket))) == 1) + route_target = TARGET_UNDEFINED; + target = this->m_shard.get_location(this->m_current_db); + + if (target) + { + MXS_INFO("INIT_DB for database '%s' on server '%s'", + this->m_current_db.c_str(), target->unique_name); + route_target = TARGET_NAMED_SERVER; + } + else + { + MXS_INFO("INIT_DB with unknown database"); + } + } + else + { + route_target = get_shard_route_target(type); + } + + /** + * Find a suitable server that matches the requirements of @c route_target + */ + + if (TARGET_IS_ALL(route_target)) + { + /** Session commands, route to all servers */ + if (route_session_write(pPacket, command)) + { + atomic_add(&m_router.m_stats.n_sescmd, 1); + atomic_add(&m_router.m_stats.n_queries, 1); + ret = 1; + } + } + else + { + target = resolve_query_target(pPacket, type, command, route_target); + } + + DCB* target_dcb = NULL; + + if (TARGET_IS_NAMED_SERVER(route_target) && target && + get_shard_dcb(&target_dcb, target->unique_name)) + { + /** We know where to route this query */ + backend_ref_t *bref = get_bref_from_dcb(target_dcb); + + MXS_INFO("Route query to \t%s:%d <", bref->backend->server->name, bref->backend->server->port); + + if (bref->session_commands.size() > 0) + { + /** Store current statement if execution of the previous + * session command hasn't been completed. */ + ss_dassert((bref->pending_cmd == NULL || this->m_closed)); + bref->pending_cmd = pPacket; + ret = 1; + } + else if ((ret = target_dcb->func.write(target_dcb, gwbuf_clone(pPacket))) == 1) { backend_ref_t* bref; @@ -600,220 +511,157 @@ int32_t SchemaRouterSession::routeQuery(GWBUF* pPacket) } gwbuf_free(pPacket); - return ret; } +void SchemaRouterSession::handle_mapping_reply(backend_ref_t* bref, GWBUF* pPacket) +{ + int rc = inspect_backend_mapping_states(bref, &pPacket); + + if (rc == 1) + { + synchronize_shard_map(); + this->m_state &= ~INIT_MAPPING; + + /* Check if the session is reconnecting with a database name + * that is not in the hashtable. If the database is not found + * then close the session. */ + + if (this->m_state & INIT_USE_DB) + { + if (!handle_default_db()) + { + rc = -1; + } + } + + if (this->m_queue.size() && rc != -1) + { + ss_dassert(this->m_state == INIT_READY); + route_queued_query(); + } + } + + if (rc == -1) + { + poll_fake_hangup_event(this->m_client); + } +} + +void SchemaRouterSession::process_response(backend_ref_t* bref, GWBUF** ppPacket) +{ + if (bref->session_commands.size() > 0) + { + /** We are executing a session command */ + if (GWBUF_IS_TYPE_SESCMD_RESPONSE((*ppPacket))) + { + if (this->m_replied_sescmd < this->m_sent_sescmd && + bref->session_commands.front().get_position() == this->m_replied_sescmd + 1) + { + /** First reply to this session command, route it to the client */ + ++this->m_replied_sescmd; + } + else + { + /** The reply to this session command has already been sent to + * the client, discard it */ + gwbuf_free(*ppPacket); + *ppPacket = NULL; + } + + bref->session_commands.pop_front(); + } + + if (*ppPacket) + { + bref_clear_state(bref, BREF_WAITING_RESULT); + } + } + else if (BREF_IS_QUERY_ACTIVE(bref)) + { + bref_clear_state(bref, BREF_QUERY_ACTIVE); + /** Set response status as replied */ + bref_clear_state(bref, BREF_WAITING_RESULT); + } +} void SchemaRouterSession::clientReply(GWBUF* pPacket, DCB* pDcb) { - backend_ref_t* bref; - GWBUF* writebuf = pPacket; + backend_ref_t* bref = get_bref_from_dcb(pDcb); - /** - * Lock router client session for secure read of router session members. - * Note that this could be done without lock by using version # - */ - if (this->m_closed) + if (this->m_closed || bref == NULL) { gwbuf_free(pPacket); return; } - /** Holding lock ensures that router session remains open */ - ss_dassert(pDcb->session != NULL); - DCB *client_dcb = pDcb->session->client_dcb; - - bref = get_bref_from_dcb(pDcb); - - if (bref == NULL) - { - gwbuf_free(writebuf); - return; - } - MXS_DEBUG("Reply from [%s] session [%p]" " mapping [%s] queries queued [%s]", bref->backend->server->unique_name, this->m_client->session, this->m_state & INIT_MAPPING ? "true" : "false", - this->m_queue == NULL ? "none" : - this->m_queue->next ? "multiple" : "one"); - - + this->m_queue.size() == 0 ? "none" : + this->m_queue.size() > 0 ? "multiple" : "one"); if (this->m_state & INIT_MAPPING) { - int rc = inspect_backend_mapping_states(bref, &writebuf); - gwbuf_free(writebuf); - writebuf = NULL; - - if (rc == 1) - { - synchronize_shard_map(); - - /* - * Check if the session is reconnecting with a database name - * that is not in the hashtable. If the database is not found - * then close the session. - */ - this->m_state &= ~INIT_MAPPING; - - if (this->m_state & INIT_USE_DB) - { - bool success = handle_default_db(); - if (!success) - { - dcb_close(this->m_client); - } - return; - } - - if (this->m_queue) - { - ss_dassert(this->m_state == INIT_READY); - route_queued_query(); - } - } - - if (rc == -1) - { - dcb_close(this->m_client); - } - return; + handle_mapping_reply(bref, pPacket); } - - if (this->m_state & INIT_USE_DB) + else if (this->m_state & INIT_USE_DB) { MXS_DEBUG("Reply to USE '%s' received for session %p", - this->m_connect_db.c_str(), - this->m_client->session); + this->m_connect_db.c_str(), this->m_client->session); this->m_state &= ~INIT_USE_DB; this->m_current_db = this->m_connect_db; ss_dassert(this->m_state == INIT_READY); - if (this->m_queue) + if (this->m_queue.size()) { route_queued_query(); } - - gwbuf_free(writebuf); - return; } - if (this->m_queue) + else if (this->m_queue.size()) { ss_dassert(this->m_state == INIT_READY); route_queued_query(); - return; } - - - - /** - * Active cursor means that reply is from session command - * execution. - */ - if (bref->session_commands.size() > 0) + else { - if (GWBUF_IS_TYPE_SESCMD_RESPONSE(writebuf)) + process_response(bref, &pPacket); + + if (pPacket) { - /** - * Discard all those responses that have already been sent to - * the client. Return with buffer including response that - * needs to be sent to client or NULL. - */ - if (this->m_replied_sescmd < this->m_sent_sescmd && - bref->session_commands.front().get_position() == this->m_replied_sescmd + 1) + MXS_SESSION_ROUTE_REPLY(pDcb->session, pPacket); + pPacket = NULL; + } + + if (bref->session_commands.size() > 0) + { + /** There are pending session commands to be executed. */ + MXS_INFO("Backend %s:%d processed reply and starts to execute active cursor.", + bref->backend->server->name, bref->backend->server->port); + execute_sescmd_in_backend(bref); + } + else if (bref->pending_cmd) /*< non-sescmd is waiting to be routed */ + { + CHK_GWBUF(bref->pending_cmd); + int ret = bref->dcb->func.write(bref->dcb, bref->pending_cmd); + bref->pending_cmd = NULL; + + if (ret == 1) { - ++this->m_replied_sescmd; + atomic_add(&this->m_router.m_stats.n_queries, 1); + bref_set_state(bref, BREF_QUERY_ACTIVE); + bref_set_state(bref, BREF_WAITING_RESULT); } else { - /** The reply to this session command has already been sent - * to the client. */ - gwbuf_free(writebuf); - writebuf = NULL; - } - bref->session_commands.pop_front(); - } - /** - * If response will be sent to client, decrease waiter count. - * This applies to session commands only. Counter decrement - * for other type of queries is done outside this block. - */ - if (writebuf != NULL && client_dcb != NULL) - { - /** Set response status as replied */ - bref_clear_state(bref, BREF_WAITING_RESULT); - } - } - /** - * Clear BREF_QUERY_ACTIVE flag and decrease waiter counter. - * This applies for queries other than session commands. - */ - else if (BREF_IS_QUERY_ACTIVE(bref)) - { - bref_clear_state(bref, BREF_QUERY_ACTIVE); - /** Set response status as replied */ - bref_clear_state(bref, BREF_WAITING_RESULT); - } - - if (writebuf != NULL && client_dcb != NULL) - { - unsigned char* cmd = (unsigned char*) writebuf->start; - int state = this->m_state; - /** Write reply to client DCB */ - MXS_INFO("returning reply [%s] " - "state [%s] session [%p]", - PTR_IS_ERR(cmd) ? "ERR" : PTR_IS_OK(cmd) ? "OK" : "RSET", - state & INIT_UNINT ? "UNINIT" : state & INIT_MAPPING ? "MAPPING" : "READY", - this->m_client->session); - MXS_SESSION_ROUTE_REPLY(pDcb->session, writebuf); - } - - /** There is one pending session command to be executed. */ - if (bref->session_commands.size() > 0) - { - - MXS_INFO("Backend %s:%d processed reply and starts to execute " - "active cursor.", - bref->backend->server->name, - bref->backend->server->port); - - execute_sescmd_in_backend(bref); - } - else if (bref->pending_cmd != NULL) /*< non-sescmd is waiting to be routed */ - { - int ret; - - CHK_GWBUF(bref->pending_cmd); - - if ((ret = bref->dcb->func.write(bref->dcb, gwbuf_clone(bref->pending_cmd))) == 1) - { - atomic_add(&this->m_router.m_stats.n_queries, 1); - /** - * Add one query response waiter to backend reference - */ - bref_set_state(bref, BREF_QUERY_ACTIVE); - bref_set_state(bref, BREF_WAITING_RESULT); - } - else - { - char* sql = modutil_get_SQL(bref->pending_cmd); - - if (sql) - { - MXS_ERROR("Routing query \"%s\" failed.", sql); - MXS_FREE(sql); - } - else - { - MXS_ERROR("Routing query failed."); + MXS_ERROR("Routing of pending query failed."); } } - gwbuf_free(bref->pending_cmd); - bref->pending_cmd = NULL; } + + gwbuf_free(pPacket); } void SchemaRouterSession::handleError(GWBUF* pMessage, @@ -1230,9 +1078,9 @@ RESULT_ROW* shard_list_cb(struct resultset* rset, void* data) * @param rses Router client session * @return 0 on success, -1 on error */ -int SchemaRouterSession::process_show_shards() +bool SchemaRouterSession::process_show_shards() { - int rval = -1; + bool rval = false; ServerMap pContent; this->m_shard.get_content(pContent); @@ -1244,7 +1092,7 @@ int SchemaRouterSession::process_show_shards() resultset_add_column(rset, "Server", MYSQL_DATABASE_MAXLEN, COL_TYPE_VARCHAR); resultset_stream_mysql(rset, this->m_client); resultset_free(rset); - rval = 0; + rval = true; } return rval; @@ -1342,9 +1190,9 @@ bool SchemaRouterSession::handle_default_db() void SchemaRouterSession::route_queued_query() { - GWBUF* tmp = this->m_queue; - this->m_queue = this->m_queue->next; - tmp->next = NULL; + GWBUF* tmp = this->m_queue.front().release(); + this->m_queue.pop_front(); + #ifdef SS_DEBUG char* querystr = modutil_get_SQL(tmp); MXS_DEBUG("Sending queued buffer for session %p: %s", @@ -1352,6 +1200,7 @@ void SchemaRouterSession::route_queued_query() querystr); MXS_FREE(querystr); #endif + poll_add_epollin_event_to_dcb(this->m_client, tmp); } @@ -1416,7 +1265,7 @@ int SchemaRouterSession::inspect_backend_mapping_states(backend_ref_t *bref, /** Send the client an error about duplicate databases * if there is a queued query from the client. */ - if (this->m_queue) + if (this->m_queue.size()) { GWBUF* error = modutil_create_mysql_err_msg(1, 0, SCHEMA_ERR_DUPLICATEDB, diff --git a/server/modules/routing/schemarouter/schemaroutersession.hh b/server/modules/routing/schemarouter/schemaroutersession.hh index 173950008..d8515a8ba 100644 --- a/server/modules/routing/schemarouter/schemaroutersession.hh +++ b/server/modules/routing/schemarouter/schemaroutersession.hh @@ -16,6 +16,7 @@ #include "schemarouter.hh" #include +#include #include #include @@ -23,6 +24,9 @@ #include "shard_map.hh" #include "session_command.hh" +using std::string; +using std::list; + /** * Bitmask values for the router session's initialization. These values are used * to prevent responses from internal commands being forwarded to the client. @@ -167,7 +171,7 @@ private: string m_connect_db; /**< Database the user was trying to connect to */ string m_current_db; /**< Current active database */ int m_state; /**< Initialization state bitmask */ - GWBUF* m_queue; /**< Query that was received before the session was ready */ + list m_queue; /**< Query that was received before the session was ready */ ROUTER_STATS m_stats; /**< Statistics for this router */ uint64_t m_sent_sescmd; /**< The latest session command being executed */ uint64_t m_replied_sescmd; /**< The last session command reply that was sent to the client */ @@ -184,9 +188,13 @@ private: bool send_database_list(); int gen_databaselist(); int inspect_backend_mapping_states(backend_ref_t *bref, GWBUF** wbuf); - int process_show_shards(); + bool process_show_shards(); showdb_response_t parse_showdb_response(backend_ref_t* bref, GWBUF** buffer); void handle_error_reply_client(DCB* backend_dcb, GWBUF* errmsg); void route_queued_query(); void synchronize_shard_map(); + void handle_mapping_reply(backend_ref_t* bref, GWBUF* pPacket); + void process_response(backend_ref_t* bref, GWBUF** ppPacket); + SERVER* resolve_query_target(GWBUF* pPacket, uint32_t type, uint8_t command, + route_target_t& route_target); };