From 8d50450b5a9eddb8fc4f7f2e6c5848e996f583a0 Mon Sep 17 00:00:00 2001 From: "wuzang.hdp" Date: Thu, 20 Jun 2019 16:10:19 +0800 Subject: [PATCH 1/4] MXS-2521:Route subseqenct COM_STMT_EXECUTE to the same server which first COM_STMT_EXECUTE was executed on --- .../readwritesplit/rwsplit_internal.hh | 2 +- .../readwritesplit/rwsplit_route_stmt.cc | 59 +++++++++++++++---- .../readwritesplit/rwsplit_session_cmd.cc | 2 +- .../routing/readwritesplit/rwsplitsession.cc | 8 ++- .../routing/readwritesplit/rwsplitsession.hh | 6 +- 5 files changed, 60 insertions(+), 17 deletions(-) diff --git a/server/modules/routing/readwritesplit/rwsplit_internal.hh b/server/modules/routing/readwritesplit/rwsplit_internal.hh index fd4745a9d..aa0d80b5e 100644 --- a/server/modules/routing/readwritesplit/rwsplit_internal.hh +++ b/server/modules/routing/readwritesplit/rwsplit_internal.hh @@ -78,7 +78,7 @@ void handle_multi_temp_and_load(RWSplitSession *rses, GWBUF *querybuf, SRWBackend handle_hinted_target(RWSplitSession *rses, GWBUF *querybuf, route_target_t route_target); SRWBackend handle_slave_is_target(RWSplit *inst, RWSplitSession *rses, - uint8_t cmd, uint32_t id); + const GWBUF *query, const RouteInfo& info); bool handle_master_is_target(RWSplit *inst, RWSplitSession *rses, SRWBackend* dest); bool handle_got_target(RWSplit *inst, RWSplitSession *rses, diff --git a/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc b/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc index 0ac458292..741977a54 100644 --- a/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc +++ b/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc @@ -93,7 +93,7 @@ void handle_connection_keepalive(RWSplit *inst, RWSplitSession *rses, } route_target_t get_target_type(RWSplitSession *rses, GWBUF *buffer, - uint8_t* command, uint32_t* type, uint32_t* stmt_id) + uint8_t* command, uint32_t* type, uint32_t* stmt_id, uint16_t* n_params) { route_target_t route_target = TARGET_MASTER; bool in_read_only_trx = rses->target_node && session_trx_is_read_only(rses->client_dcb->session); @@ -161,7 +161,7 @@ route_target_t get_target_type(RWSplitSession *rses, GWBUF *buffer, } else if (is_ps_command(*command)) { - *stmt_id = get_internal_ps_id(rses, buffer); + *stmt_id = get_internal_ps_id(rses, buffer, n_params); *type = rses->ps_manager.get_type(*stmt_id); } @@ -250,7 +250,7 @@ bool route_single_stmt(RWSplit *inst, RWSplitSession *rses, GWBUF *querybuf, con } else if (TARGET_IS_SLAVE(route_target)) { - if ((target = handle_slave_is_target(inst, rses, command, stmt_id))) + if ((target = handle_slave_is_target(inst, rses, querybuf, info))) { succp = true; @@ -999,6 +999,38 @@ SRWBackend handle_hinted_target(RWSplitSession *rses, GWBUF *querybuf, return target; } +/** + * @brief Determine whether this stmt is subsequent COM_STMT_EXECUTE + * + * @param cmd command type + * @param query GWBUF including the query + * @param info Route info + * + * return is subsequent COM_STMT_EXECUTE + */ +bool is_sub_stmt_exec(uint8_t cmd, const GWBUF *query, uint16_t n_params) +{ + if (cmd != COM_STMT_EXECUTE || n_params == 0) + { + return false; + } + + bool rval = true; + + /*https://mariadb.com/kb/en/library/com_stmt_execute/*/ + /*need n_params to parse new_params_bound_flag(alias send type to server)*/ + int new_params_bound_flag_offset = MYSQL_HEADER_LEN + 10 + (n_params + 7) / 8; + ss_dassert(gwbuf_length(query) <= new_params_bound_flag_offset); + uint8_t data[new_params_bound_flag_offset]; + gwbuf_copy_data(query, 0, new_params_bound_flag_offset, data); + if (data[new_params_bound_flag_offset]) + { + rval = false; + } + + return rval; +} + /** * @brief Handle slave is the target * @@ -1006,20 +1038,27 @@ SRWBackend handle_hinted_target(RWSplitSession *rses, GWBUF *querybuf, * * @param inst Router instance * @param ses Router session - * @param target_dcb DCB for the target server + * @param query GWBUF including the query + * @param info Holding routing related information * * @return bool - true if succeeded, false otherwise */ SRWBackend handle_slave_is_target(RWSplit *inst, RWSplitSession *rses, - uint8_t cmd, uint32_t stmt_id) + const GWBUF *query, const RouteInfo& info) { int rlag_max = rses_get_max_replication_lag(rses); SRWBackend target; + uint8_t cmd = info.command; + uint32_t stmt_id = info.stmt_id; + uint16_t n_params = info.n_params; - if (cmd == MXS_COM_STMT_FETCH) + if (cmd == MXS_COM_STMT_FETCH || is_sub_stmt_exec(cmd, query, n_params)) { /** The COM_STMT_FETCH must be executed on the same server as the - * COM_STMT_EXECUTE was executed on */ + * COM_STMT_EXECUTE was executed on, the subsequent COM_STMT_EXECUTE also + */ + + const char* command_str = cmd == MXS_COM_STMT_FETCH ? "COM_STMT_FETCH" : "subseqent COM_STMT_EXECUTE"; ExecMap::iterator it = rses->exec_map.find(stmt_id); if (it != rses->exec_map.end()) @@ -1027,17 +1066,17 @@ SRWBackend handle_slave_is_target(RWSplit *inst, RWSplitSession *rses, if (it->second->in_use()) { target = it->second; - MXS_INFO("COM_STMT_FETCH on %s", target->name()); + MXS_INFO("%s on %s", command_str, target->name()); } else { MXS_ERROR("Old COM_STMT_EXECUTE target %s not in use, cannot " - "proceed with COM_STMT_FETCH", it->second->name()); + "proceed with %s", it->second->name(), command_str); } } else { - MXS_WARNING("Unknown statement ID %u used in COM_STMT_FETCH", stmt_id); + MXS_WARNING("Unknown statement ID %u used in %s", stmt_id, command_str); } } else diff --git a/server/modules/routing/readwritesplit/rwsplit_session_cmd.cc b/server/modules/routing/readwritesplit/rwsplit_session_cmd.cc index f9bb2684e..6324ea778 100644 --- a/server/modules/routing/readwritesplit/rwsplit_session_cmd.cc +++ b/server/modules/routing/readwritesplit/rwsplit_session_cmd.cc @@ -122,7 +122,7 @@ void process_sescmd_response(RWSplitSession* rses, SRWBackend& backend, { /** Map the returned response to the internal ID */ MXS_INFO("PS ID %u maps to internal ID %lu", resp.id, id); - rses->ps_handles[resp.id] = id; + rses->ps_handles[resp.id] = (id << 16) + resp.parameters; } // Discard any slave connections that did not return the same result diff --git a/server/modules/routing/readwritesplit/rwsplitsession.cc b/server/modules/routing/readwritesplit/rwsplitsession.cc index 858614f3b..1079e7761 100644 --- a/server/modules/routing/readwritesplit/rwsplitsession.cc +++ b/server/modules/routing/readwritesplit/rwsplitsession.cc @@ -110,7 +110,7 @@ bool RWBackend::consume_fetched_rows(GWBUF* buffer) return m_expected_rows == 0; } -uint32_t get_internal_ps_id(RWSplitSession* rses, GWBUF* buffer) +uint32_t get_internal_ps_id(RWSplitSession* rses, GWBUF* buffer, uint16_t* n_params) { uint32_t rval = 0; @@ -120,7 +120,8 @@ uint32_t get_internal_ps_id(RWSplitSession* rses, GWBUF* buffer) if (it != rses->ps_handles.end()) { - rval = it->second; + rval = it->second >> 16; + *n_params = it->second & 0xffff; } else { @@ -135,7 +136,8 @@ RouteInfo::RouteInfo(RWSplitSession* rses, GWBUF* buffer): target(TARGET_UNDEFINED), command(0xff), type(QUERY_TYPE_UNKNOWN), - stmt_id(0) + stmt_id(0), + n_params(0) { target = get_target_type(rses, buffer, &command, &type, &stmt_id); } diff --git a/server/modules/routing/readwritesplit/rwsplitsession.hh b/server/modules/routing/readwritesplit/rwsplitsession.hh index 33e09c87b..4e5b49ff1 100644 --- a/server/modules/routing/readwritesplit/rwsplitsession.hh +++ b/server/modules/routing/readwritesplit/rwsplitsession.hh @@ -32,7 +32,7 @@ enum reply_state_t rstostr((a)->get_reply_state()), rstostr(b)); typedef std::map BackendHandleMap; /** Internal ID to external ID */ -typedef std::map ClientHandleMap; /** External ID to internal ID */ +typedef std::map ClientHandleMap; /** External ID to internal ID */ class RWBackend: public mxs::Backend { @@ -171,6 +171,7 @@ struct RouteInfo uint8_t command; /**< The command byte, 0xff for unknown commands */ uint32_t type; /**< The query type, QUERY_TYPE_UNKNOWN for unknown types*/ uint32_t stmt_id; /**< Prepared statement ID, 0 for unknown */ + uint16_t n_params; /**< Prepared statement params count */ }; /** @@ -202,7 +203,8 @@ static inline const char* rstostr(reply_state_t state) * * @param rses Router client session * @param buffer Buffer containing a binary protocol statement other than COM_STMT_PREPARE + * @param n_params statement parmas number * * @return The internal ID of the prepared statement that the buffer contents refer to */ -uint32_t get_internal_ps_id(RWSplitSession* rses, GWBUF* buffer); +uint32_t get_internal_ps_id(RWSplitSession* rses, GWBUF* buffer, uint16_t* n_params); From 6b31b80e76069669456ca7c2949ab6c72e87b54f Mon Sep 17 00:00:00 2001 From: "wuzang.hdp" Date: Thu, 20 Jun 2019 16:38:07 +0800 Subject: [PATCH 2/4] fix wrong assert --- server/modules/routing/readwritesplit/rwsplit_route_stmt.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc b/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc index 741977a54..23f2f7e0d 100644 --- a/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc +++ b/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc @@ -1020,7 +1020,7 @@ bool is_sub_stmt_exec(uint8_t cmd, const GWBUF *query, uint16_t n_params) /*https://mariadb.com/kb/en/library/com_stmt_execute/*/ /*need n_params to parse new_params_bound_flag(alias send type to server)*/ int new_params_bound_flag_offset = MYSQL_HEADER_LEN + 10 + (n_params + 7) / 8; - ss_dassert(gwbuf_length(query) <= new_params_bound_flag_offset); + ss_dassert((int)gwbuf_length(query) >= new_params_bound_flag_offset); uint8_t data[new_params_bound_flag_offset]; gwbuf_copy_data(query, 0, new_params_bound_flag_offset, data); if (data[new_params_bound_flag_offset]) From 51ce3c53fd74bb2788e83d3d4d7ca17d0bb23ec9 Mon Sep 17 00:00:00 2001 From: "wuzang.hdp" Date: Thu, 20 Jun 2019 19:52:20 +0800 Subject: [PATCH 3/4] shift 32 --- server/modules/routing/readwritesplit/rwsplit_session_cmd.cc | 2 +- server/modules/routing/readwritesplit/rwsplitsession.cc | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/server/modules/routing/readwritesplit/rwsplit_session_cmd.cc b/server/modules/routing/readwritesplit/rwsplit_session_cmd.cc index 6324ea778..7bd56edbc 100644 --- a/server/modules/routing/readwritesplit/rwsplit_session_cmd.cc +++ b/server/modules/routing/readwritesplit/rwsplit_session_cmd.cc @@ -122,7 +122,7 @@ void process_sescmd_response(RWSplitSession* rses, SRWBackend& backend, { /** Map the returned response to the internal ID */ MXS_INFO("PS ID %u maps to internal ID %lu", resp.id, id); - rses->ps_handles[resp.id] = (id << 16) + resp.parameters; + rses->ps_handles[resp.id] = (id << 32) + resp.parameters; } // Discard any slave connections that did not return the same result diff --git a/server/modules/routing/readwritesplit/rwsplitsession.cc b/server/modules/routing/readwritesplit/rwsplitsession.cc index 1079e7761..4d168c573 100644 --- a/server/modules/routing/readwritesplit/rwsplitsession.cc +++ b/server/modules/routing/readwritesplit/rwsplitsession.cc @@ -120,8 +120,8 @@ uint32_t get_internal_ps_id(RWSplitSession* rses, GWBUF* buffer, uint16_t* n_par if (it != rses->ps_handles.end()) { - rval = it->second >> 16; - *n_params = it->second & 0xffff; + rval = it->second >> 32; + *n_params = it->second & 0xffffffff; } else { From 5c762bb84137fd8f571084e69fd3a2eb32273861 Mon Sep 17 00:00:00 2001 From: "wuzang.hdp" Date: Thu, 27 Jun 2019 10:54:56 +0800 Subject: [PATCH 4/4] misc fix --- .../modules/routing/readwritesplit/rwsplit_route_stmt.cc | 8 ++++---- server/modules/routing/readwritesplit/rwsplitsession.hh | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc b/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc index 23f2f7e0d..2b2d0ec15 100644 --- a/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc +++ b/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc @@ -1002,7 +1002,7 @@ SRWBackend handle_hinted_target(RWSplitSession *rses, GWBUF *querybuf, /** * @brief Determine whether this stmt is subsequent COM_STMT_EXECUTE * - * @param cmd command type + * @param cmd command type * @param query GWBUF including the query * @param info Route info * @@ -1021,9 +1021,9 @@ bool is_sub_stmt_exec(uint8_t cmd, const GWBUF *query, uint16_t n_params) /*need n_params to parse new_params_bound_flag(alias send type to server)*/ int new_params_bound_flag_offset = MYSQL_HEADER_LEN + 10 + (n_params + 7) / 8; ss_dassert((int)gwbuf_length(query) >= new_params_bound_flag_offset); - uint8_t data[new_params_bound_flag_offset]; - gwbuf_copy_data(query, 0, new_params_bound_flag_offset, data); - if (data[new_params_bound_flag_offset]) + uint8_t flag; + gwbuf_copy_data(query, new_params_bound_flag_offset, 1, &flag); + if (flag) { rval = false; } diff --git a/server/modules/routing/readwritesplit/rwsplitsession.hh b/server/modules/routing/readwritesplit/rwsplitsession.hh index 4e5b49ff1..0ff32d5c0 100644 --- a/server/modules/routing/readwritesplit/rwsplitsession.hh +++ b/server/modules/routing/readwritesplit/rwsplitsession.hh @@ -203,7 +203,7 @@ static inline const char* rstostr(reply_state_t state) * * @param rses Router client session * @param buffer Buffer containing a binary protocol statement other than COM_STMT_PREPARE - * @param n_params statement parmas number + * @param n_params Statement parmas count * * @return The internal ID of the prepared statement that the buffer contents refer to */