From 8d4f3015457a5bcce3c1aa1d478735b1b7c4c24a Mon Sep 17 00:00:00 2001 From: Markus Makela Date: Thu, 23 Apr 2015 20:12:04 +0300 Subject: [PATCH] Added the option to disable session command history to readwritesplit. --- server/modules/include/readwritesplit.h | 4 ++ .../routing/readwritesplit/readwritesplit.c | 56 ++++++++++++++++++- 2 files changed, 57 insertions(+), 3 deletions(-) diff --git a/server/modules/include/readwritesplit.h b/server/modules/include/readwritesplit.h index 30a7e9489..7c94a9b13 100644 --- a/server/modules/include/readwritesplit.h +++ b/server/modules/include/readwritesplit.h @@ -150,6 +150,7 @@ typedef struct mysql_sescmd_st { unsigned char reply_cmd; /*< The reply command. One of OK, ERR, RESULTSET or * LOCAL_INFILE. Slave servers are compared to this * when they return session command replies.*/ + int position; /*< Position of this command */ #if defined(SS_DEBUG) skygw_chk_t my_sescmd_chk_tail; #endif @@ -184,6 +185,7 @@ typedef struct sescmd_cursor_st { rses_property_t** scmd_cur_ptr_property; /*< address of pointer to owner property */ mysql_sescmd_t* scmd_cur_cmd; /*< pointer to current session command */ bool scmd_cur_active; /*< true if command is being executed */ + int position; /*< Position of this cursor */ #if defined(SS_DEBUG) skygw_chk_t scmd_cur_chk_tail; #endif @@ -248,6 +250,7 @@ typedef struct rwsplit_config_st { int rw_max_slave_replication_lag; target_t rw_use_sql_variables_in; int rw_max_sescmd_history_size; + bool disable_sescmd_hist; } rwsplit_config_t; @@ -291,6 +294,7 @@ struct router_client_session { bool rses_autocommit_enabled; bool rses_transaction_active; DCB* client_dcb; + int pos_generator; #if defined(PREP_STMT_CACHING) HASHTABLE* rses_prep_stmt[2]; #endif diff --git a/server/modules/routing/readwritesplit/readwritesplit.c b/server/modules/routing/readwritesplit/readwritesplit.c index 9c884eda2..b424a6376 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.c +++ b/server/modules/routing/readwritesplit/readwritesplit.c @@ -628,7 +628,7 @@ createInstance(SERVICE *service, char **options) * If server weighting has been defined calculate the percentage * of load that will be sent to each server. This is only used for * calculating the least connections, either globally or within a - * service, or the numebr of current operations on a server. + * service, or the number of current operations on a server. */ if ((weightby = serviceGetWeightingParameter(service)) != NULL) { @@ -698,6 +698,13 @@ createInstance(SERVICE *service, char **options) { rwsplit_process_router_options(router, options); } + + /** These options cancel each other out */ + if(router->rwsplit_config.disable_sescmd_hist && router->rwsplit_config.rw_max_sescmd_history_size > 0) + { + router->rwsplit_config.rw_max_sescmd_history_size = 0; + } + /** * Set default value for max_slave_connections and for slave selection * criteria. If parameter is set in config file max_slave_connections @@ -807,7 +814,7 @@ static void* newSession( rwsplit_process_router_options(router, router->service->routerOptions); } /** Copy config struct from router instance */ - client_rses->rses_config = router->rwsplit_config; + memcpy(&client_rses->rses_config,&router->rwsplit_config,sizeof(rwsplit_config_t)); spinlock_release(&router->lock); /** @@ -3667,7 +3674,8 @@ static mysql_sescmd_t* mysql_sescmd_init ( /** Set session command buffer */ sescmd->my_sescmd_buf = sescmd_buf; sescmd->my_sescmd_packet_type = packet_type; - + sescmd->position = atomic_add(&rses->pos_generator,1); + return sescmd; } @@ -3724,6 +3732,7 @@ static GWBUF* sescmd_cursor_process_replies( while (scmd != NULL && replybuf != NULL) { bref->reply_cmd = *((unsigned char*)replybuf->start + 4); + scur->position = scmd->position; /** Faster backend has already responded to client : discard */ if (scmd->my_sescmd_is_replied) { @@ -4361,6 +4370,43 @@ static bool route_session_write( goto return_succp; } + if(router_cli_ses->rses_config.disable_sescmd_hist) + { + rses_property_t *prop, *tmp; + backend_ref_t* bref; + bool conflict; + + prop = router_cli_ses->rses_properties[RSES_PROP_TYPE_SESCMD]; + while(prop) + { + conflict = false; + + for(i = 0;irses_nbackends;i++) + { + bref = &backend_ref[i]; + if(BREF_IS_IN_USE(bref)) + { + + if(bref->bref_sescmd_cur.position <= prop->rses_prop_data.sescmd.position) + { + conflict = true; + break; + } + } + } + + if(conflict) + { + break; + } + + tmp = prop; + router_cli_ses->rses_properties[RSES_PROP_TYPE_SESCMD] = prop->rses_prop_next; + rses_property_done(tmp); + prop = router_cli_ses->rses_properties[RSES_PROP_TYPE_SESCMD]; + } + } + /** * Additional reference is created to querybuf to * prevent it from being released before properties @@ -4538,6 +4584,10 @@ static void rwsplit_process_router_options( { router->rwsplit_config.rw_max_sescmd_history_size = atoi(value); } + else if(strcmp(options[i],"disable_sescmd_history") == 0) + { + router->rwsplit_config.disable_sescmd_hist = config_truth_value(value); + } } } /*< for */ }