diff --git a/server/modules/routing/schemarouter/schemaroutersession.cc b/server/modules/routing/schemarouter/schemaroutersession.cc index 38e1e6d14..60812852b 100644 --- a/server/modules/routing/schemarouter/schemaroutersession.cc +++ b/server/modules/routing/schemarouter/schemaroutersession.cc @@ -143,9 +143,9 @@ static void inspect_query(GWBUF* pPacket, uint32_t* type, qc_query_op_t* op, uin case MXS_COM_DEBUG: /*< 0d all servers dump debug info to stdout */ case MXS_COM_PING: /*< 0e all servers are pinged */ case MXS_COM_CHANGE_USER: /*< 11 all servers change it accordingly */ - case MXS_COM_STMT_CLOSE: /*< free prepared statement */ - case MXS_COM_STMT_SEND_LONG_DATA: /*< send data to column */ - case MXS_COM_STMT_RESET: /*< resets the data of a prepared statement */ + //case MXS_COM_STMT_CLOSE: /*< free prepared statement */ + //case MXS_COM_STMT_SEND_LONG_DATA: /*< send data to column */ + //case MXS_COM_STMT_RESET: /*< resets the data of a prepared statement */ *type = QUERY_TYPE_SESSION_WRITE; break; @@ -447,6 +447,15 @@ int32_t SchemaRouterSession::routeQuery(GWBUF* pPacket) pPacket = NULL; ret = 1; } + else if (qc_query_is_type(type, QUERY_TYPE_PREPARE_STMT)) + { + if (handle_statement(pPacket, bref, command, type)) + { + atomic_add(&m_router->m_stats.n_sescmd, 1); + atomic_add(&m_router->m_stats.n_queries, 1); + ret = 1; + } + } else if (bref->write(pPacket)) { /** Add one query response waiter to backend reference */ @@ -502,10 +511,22 @@ void SchemaRouterSession::process_sescmd_response(SSRBackend& bref, GWBUF** ppPa if (bref->has_session_commands()) { ss_dassert(GWBUF_IS_COLLECTED_RESULT(*ppPacket)); + uint8_t command = bref->next_session_command()->get_command(); uint64_t id = bref->complete_session_command(); + MXS_PS_RESPONSE resp = {}; if (m_replied_sescmd < m_sent_sescmd && id == m_replied_sescmd + 1) { + if (command == MXS_COM_STMT_PREPARE) + { + mxs_mysql_extract_ps_response(*ppPacket, &resp); + MXS_INFO("ID: %lu HANDLE: %lu", (unsigned long)id, (unsigned long)resp.id); + m_shard.add_ps_handle(id, resp.id); + MXS_INFO("STMT SERVER: %s", bref->backend()->server->name); + m_shard.add_statement(id, bref->backend()->server); + uint8_t* ptr = GWBUF_DATA(*ppPacket) + MYSQL_PS_ID_OFFSET; + gw_mysql_set_byte4(ptr, id); + } /** First reply to this session command, route it to the client */ ++m_replied_sescmd; } @@ -564,9 +585,11 @@ void SchemaRouterSession::clientReply(GWBUF* pPacket, DCB* pDcb) { process_sescmd_response(bref, &pPacket); - ss_dassert(bref->is_waiting_result()); - /** Set response status as replied */ - bref->ack_write(); + if (bref->is_waiting_result()) + { + /** Set response status as replied */ + bref->ack_write(); + } if (pPacket) { @@ -1409,8 +1432,9 @@ SERVER* SchemaRouterSession::get_shard_target(GWBUF* buffer, uint32_t qtype) SERVER *rval = NULL; bool has_dbs = false; /**If the query targets any database other than the current one*/ qc_query_op_t op = QUERY_OP_UNDEFINED; + uint8_t command = mxs_mysql_get_command(buffer); - if (mxs_mysql_get_command(buffer) == MXS_COM_QUERY) + if (command == MXS_COM_QUERY) { op = qc_get_operation(buffer); int n_tables = 0; @@ -1495,6 +1519,7 @@ SERVER* SchemaRouterSession::get_shard_target(GWBUF* buffer, uint32_t qtype) if (rval) { + MXS_INFO("PREPARING NAMED %s ON SERVER %s", stmt, rval->name); m_shard.add_statement(stmt, rval); } MXS_FREE(tables); @@ -1504,6 +1529,7 @@ SERVER* SchemaRouterSession::get_shard_target(GWBUF* buffer, uint32_t qtype) { char* stmt = qc_get_prepare_name(buffer); rval = m_shard.get_statement(stmt); + MXS_INFO("EXECUTING NAMED %s ON SERVER %s", stmt, rval->name); MXS_FREE(stmt); } else if (qc_query_is_type(qtype, QUERY_TYPE_DEALLOC_PREPARE)) @@ -1511,10 +1537,38 @@ SERVER* SchemaRouterSession::get_shard_target(GWBUF* buffer, uint32_t qtype) char* stmt = qc_get_prepare_name(buffer); if ((rval = m_shard.get_statement(stmt))) { + MXS_INFO("DEALLOCING NAMED %s ON SERVER %s", stmt, rval->name); m_shard.remove_statement(stmt); } MXS_FREE(stmt); } + else if (qc_query_is_type(qtype, QUERY_TYPE_PREPARE_STMT)) + { + int n_tables = 0; + char** tables = qc_get_table_names(buffer, &n_tables, true); + for (int i = 0; i < n_tables; i++) + { + rval = m_shard.get_location(tables[0]); + MXS_FREE(tables[i]); + } + rval ? MXS_INFO("PREPARE STATEMENT ON SERVER %s", rval->name) : + MXS_INFO("PREPARE STATEMENT TARGETS NO MAPPED TABLES"); + + MXS_FREE(tables); + } + else if (mxs_mysql_is_ps_command(command)) + { + uint32_t id = mxs_mysql_extract_ps_id(buffer); + uint32_t handle = m_shard.get_ps_handle(id); + uint8_t* ptr = GWBUF_DATA(buffer) + MYSQL_PS_ID_OFFSET; + gw_mysql_set_byte4(ptr, handle); + rval = m_shard.get_statement(id); + if (command == MXS_COM_STMT_CLOSE) + { + MXS_INFO("CLOSING STATEMENT %d ", id); + m_shard.remove_statement(id); + } + } if (buffer->hint && buffer->hint->type == HINT_ROUTE_TO_NAMED_SERVER) { @@ -1741,4 +1795,48 @@ bool SchemaRouterSession::send_tables(GWBUF* pPacket) return rval; } +bool SchemaRouterSession::handle_statement(GWBUF* querybuf, SSRBackend& bref, uint8_t command, uint32_t type) +{ + bool succp = false; + + atomic_add(&m_stats.longest_sescmd, 1); + + /** Increment the session command count */ + ++m_sent_sescmd; + + if (bref->in_use()) + { + GWBUF *buffer = gwbuf_clone(querybuf); + bref->append_session_command(buffer, m_sent_sescmd); + + if (bref->session_command_count() == 1) + { + if (bref->execute_session_command()) + { + succp = true; + atomic_add_uint64(&bref->server()->stats.packets, 1); + } + else + { + MXS_ERROR("Failed to execute session " + "command in %s:%d", + bref->backend()->server->address, + bref->backend()->server->port); + } + } + else + { + ss_dassert(bref->session_command_count() > 1); + /** The server is already executing a session command */ + MXS_INFO("Backend %s:%d already executing sescmd.", + bref->backend()->server->address, + bref->backend()->server->port); + succp = true; + } + } + + gwbuf_free(querybuf); + return succp; +} + } \ No newline at end of file diff --git a/server/modules/routing/schemarouter/schemaroutersession.hh b/server/modules/routing/schemarouter/schemaroutersession.hh index 29ddb43b8..f6f5b5488 100644 --- a/server/modules/routing/schemarouter/schemaroutersession.hh +++ b/server/modules/routing/schemarouter/schemaroutersession.hh @@ -147,6 +147,7 @@ private: void route_queued_query(); void synchronize_shards(); void handle_mapping_reply(SSRBackend& bref, GWBUF** pPacket); + bool handle_statement(GWBUF* querybuf, SSRBackend& bref, uint8_t command, uint32_t type); /** Member variables */ bool m_closed; /**< True if session closed */ diff --git a/server/modules/routing/schemarouter/shard_map.cc b/server/modules/routing/schemarouter/shard_map.cc index 27f440776..ddc44ca08 100644 --- a/server/modules/routing/schemarouter/shard_map.cc +++ b/server/modules/routing/schemarouter/shard_map.cc @@ -36,6 +36,33 @@ void Shard::add_statement(std::string stmt, SERVER* target) stmt_map[stmt] = target; } +void Shard::add_statement(uint32_t id, SERVER* target) +{ + MXS_DEBUG("ADDING ID: [%u] server: [%s]", id, target->name); + m_binary_map[id] = target; +} + +void Shard::add_ps_handle(uint32_t id, uint32_t handle) +{ + MXS_DEBUG("ID: [%u] HANDLE: [%u]", id, handle); + m_ps_handles[id] = handle; +} + +bool Shard::remove_ps_handle(uint32_t id) +{ + return m_ps_handles.erase(id); +} + +uint32_t Shard::get_ps_handle(uint32_t id) +{ + PSHandleMap::iterator it = m_ps_handles.find(id); + if (it != m_ps_handles.end()) + { + return it->second; + } + return 0; +} + void Shard::replace_location(std::string db, SERVER* target) { m_map[db] = target; @@ -94,11 +121,27 @@ SERVER* Shard::get_statement(std::string stmt) return rval; } +SERVER* Shard::get_statement(uint32_t id) +{ + SERVER* rval = NULL; + BinaryPSMap::iterator iter = m_binary_map.find(id); + if(iter != m_binary_map.end()) + { + rval = iter->second; + } + return rval; +} + bool Shard::remove_statement(std::string stmt) { return stmt_map.erase(stmt); } +bool Shard::remove_statement(uint32_t id) +{ + return m_binary_map.erase(id); +} + bool Shard::stale(double max_interval) const { time_t now = time(NULL); @@ -164,4 +207,4 @@ void ShardManager::update_shard(Shard& shard, std::string user) { m_maps[user] = shard; } -} +} \ No newline at end of file diff --git a/server/modules/routing/schemarouter/shard_map.hh b/server/modules/routing/schemarouter/shard_map.hh index 09c6e7ed1..d0bdab4d3 100644 --- a/server/modules/routing/schemarouter/shard_map.hh +++ b/server/modules/routing/schemarouter/shard_map.hh @@ -25,6 +25,8 @@ using namespace maxscale; /** This contains the database to server mapping */ typedef std::unordered_map ServerMap; +typedef std::unordered_map BinaryPSMap; +typedef std::unordered_map PSHandleMap; class Shard { @@ -52,10 +54,14 @@ public: SERVER* get_location(std::string db); void add_statement(std::string stmt, SERVER* target); - + void add_statement(uint32_t id, SERVER* target); + void add_ps_handle(uint32_t id, uint32_t handle); + uint32_t get_ps_handle(uint32_t id); + bool remove_ps_handle(uint32_t id); SERVER* get_statement(std::string stmt); - + SERVER* get_statement(uint32_t id); bool remove_statement(std::string stmt); + bool remove_statement(uint32_t id); /** * @brief Change the location of a database @@ -100,6 +106,8 @@ public: private: ServerMap m_map; ServerMap stmt_map; + BinaryPSMap m_binary_map; + PSHandleMap m_ps_handles; time_t m_last_updated; };