diff --git a/protocol_1.0/mod_skysql.c b/protocol_1.0/mod_skysql.c index bf58500ef..7ad3b167d 100644 --- a/protocol_1.0/mod_skysql.c +++ b/protocol_1.0/mod_skysql.c @@ -1,6 +1,6 @@ ////////////////////////////////// // SKYSQL GATEWAY main module -// By Massimiliano Pinto 2012 +// By Massimiliano Pinto 2012/2013 // SkySQL AB ////////////////////////////////// // @@ -487,11 +487,13 @@ static int mysql_connect(char *host, int port, char *dbname, char *user, char *p } if (passwd != NULL) { - uint8_t hash1[APR_SHA1_DIGESTSIZE]; - uint8_t hash2[APR_SHA1_DIGESTSIZE]; - uint8_t new_sha[APR_SHA1_DIGESTSIZE]; + uint8_t hash1[APR_SHA1_DIGESTSIZE]=""; + uint8_t hash2[APR_SHA1_DIGESTSIZE]=""; + uint8_t new_sha[APR_SHA1_DIGESTSIZE]=""; - skysql_sha1_str(passwd, strlen(passwd), hash1); + + //skysql_sha1_str("xxxx", strlen("xxxx"), hash1); + memcpy(hash1, passwd, 20); skysql_sha1_str(hash1, 20, hash2); bin2hex(dbpass, hash2, 20); skysql_sha1_2_str(scramble, 20, hash2, 20, new_sha); @@ -509,7 +511,6 @@ static int mysql_connect(char *host, int port, char *dbname, char *user, char *p fprintf(stderr, "\n]\n"); fflush(stderr); #endif - } if (dbname == NULL) { @@ -700,6 +701,7 @@ static int skysql_process_connection(conn_rec *c) { skysql_client_auth *mysql_client_data = NULL; mysql_driver_details *mysql_driver = NULL; MYSQL_conn *conn = NULL; + MYSQL_conn *write_conn = NULL; apr_pool_t *pool = NULL; int max_queries_per_connection = 0; uint8_t mysql_command = 0; @@ -712,11 +714,20 @@ static int skysql_process_connection(conn_rec *c) { uint8_t stage1_hash[20 +1] =""; conn_details *find_server = NULL; + char *selected_host_m = NULL; char *selected_host = NULL; + char *selected_host_temp = NULL; char *selected_dbname = NULL; int selected_shard = 0; int selected_port = 0; + int selected_port_m = 0; + backend_list l; + + // current slave + int curr_s = 1; + + //server to client socket timeout apr_interval_time_t timeout = 300000000; ///////////////////////////////////////// @@ -783,6 +794,8 @@ static int skysql_process_connection(conn_rec *c) { rv = skysql_read_client_autentication(c, pool, scramble, scramble_len, mysql_client_data, stage1_hash); + ap_log_error(APLOG_MARK, APLOG_ERR, 0, c->base_server, "Client MySQL Stage1 Hash [%s]", stage1_hash); + // client authentication data stored if (!rv) { @@ -833,51 +846,65 @@ static int skysql_process_connection(conn_rec *c) { ///////////////////////// if (!conf->pool_enabled) { + curr_s = 1; ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "MySQL backend open/close"); conn = mysql_init(NULL); + write_conn = mysql_init(NULL); if (conn == NULL) { - ap_log_error(APLOG_MARK, APLOG_ERR, 0, c->base_server, "MYSQL init Error %u: %s", 1, "No memory"); + ap_log_error(APLOG_MARK, APLOG_ERR, 0, c->base_server, "MYSQL Read conn init Error %u: %s", 1, "No memory"); + return 500; + } + if (write_conn == NULL) { + ap_log_error(APLOG_MARK, APLOG_ERR, 0, c->base_server, "MYSQL Write conn init Error %u: %s", 1, "No memory"); return 500; } - - // do the connect - // find config data - find_server = apr_hash_get(conf->resources, "loadbal", APR_HASH_KEY_STRING); - if (find_server != NULL) { - ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "SKYSQL config find was DONE"); - - //switch(find_server->type) - if (find_server->nshards == 1) { - selected_port = atoi(strchr(find_server->server_list, ':') + 1); - selected_host = apr_pstrndup(c->pool, find_server->server_list, strchr(find_server->server_list, ':') - find_server->server_list); - selected_shard = 1; - } else { - selected_shard = select_random_slave_server(find_server->nshards); - ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "SKYSQL config find [%i] servers", find_server->nshards); - get_server_from_list(&selected_host, &selected_port, find_server->server_list, selected_shard, c->pool); - } - } else { - ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "SKYSQL config find KO: using default!"); - selected_port = 3306; - selected_host = apr_pstrdup(c->pool, "127.0.0.1"); - } - - ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "SKYSQL backend selection [%i], [%s]:[%i]", selected_shard, selected_host, selected_port); if (mysql_client_data->database != NULL) { selected_dbname = mysql_client_data->database; } else { selected_dbname = "test"; } - - if (mysql_connect(selected_host, selected_port, selected_dbname, mysql_client_data->username, "pippo", conn, mysql_driver->compress) != 0) { - //if (mysql_real_connect(conn, "192.168.1.40", "root", "pippo", "test", 3306, NULL, 0) == NULL) // - ap_log_error(APLOG_MARK, APLOG_ERR, 0, c->base_server, "MYSQL Connect [%s:%i] Error %u: %s", selected_host, selected_port, mysql_errno(conn), mysql_error(conn)); - return 500; + + + // backend selection: Master and Slave load balancing (if more than one slave in config) + + l.list = config_area; + curr_s = select_random_slave_server(l.list, &l.num); + + if (l.num > 1) { + ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "SkySQL M/S RunTime config [%s], RoundRobin selected slave is [%i]", config_area, curr_s); } else { - ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "SkySQL RunTime Opened connection to backend"); - ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "Backend Server TID %i, scamble_buf [%5s]", conn->tid, conn->scramble); + ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "SkySQL M/S RunTime config [%s]", config_area); + } + + ///////////////////////////////// + // find a SLAVE in the list + // index > 0 is for slave + ///////////////////////////////// + get_server_from_list(&selected_host_temp, &selected_port, config_area, curr_s, c->pool); + selected_host=apr_pstrdup(c->pool, selected_host_temp); + + if (mysql_connect(selected_host, selected_port, selected_dbname, mysql_client_data->username, stage1_hash, conn, mysql_driver->compress) != 0) { + ap_log_error(APLOG_MARK, APLOG_ERR, 0, c->base_server, "MYSQL Read Connect [%s:%i] Error %u: %s", selected_host, selected_port, mysql_errno(conn), mysql_error(conn)); + return 500; + } else { + ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "SkySQL READ RunTime Opened connection to backend [%s:%i:%s]", selected_host, selected_port, selected_dbname); + ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "READ Backend Server TID %i, scamble_buf [%5s]", conn->tid, conn->scramble); + } + + ////////////////////////////////// + // find the MASTER in the list + ////////////////////////////////// + get_master_from_list(&selected_host_temp, &selected_port_m, config_area, c->pool); + selected_host_m=apr_pstrdup(c->pool, selected_host_temp); + + if (mysql_connect(selected_host_m, selected_port_m, selected_dbname, mysql_client_data->username, stage1_hash, write_conn, mysql_driver->compress) != 0) { + ap_log_error(APLOG_MARK, APLOG_ERR, 0, c->base_server, "MYSQL Write Connect [%s:%i] Error %u: %s", selected_host_m, selected_port_m, mysql_errno(write_conn), mysql_error(write_conn)); + return 500; + } else { + ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "SkySQL Write RunTime Opened connection to backend [%s:%i:%s]", selected_host_m, selected_port_m, selected_dbname); + ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "Write Backend Server TID %i, scamble_buf [%5s]", write_conn->tid, write_conn->scramble); } } else { @@ -902,6 +929,7 @@ static int skysql_process_connection(conn_rec *c) { while(1) { char i_username[100]=""; + int query_logic=0; ////////////////////////////////////////////////////////////// // the new pool is allocated on c->pool // this new pool is the right one for the while(1) main loop @@ -970,13 +998,13 @@ static int skysql_process_connection(conn_rec *c) { /////////////////////////// rv = apr_bucket_read(bucket, &data, &len, APR_BLOCK_READ); - ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "Input data with len [%i]", len); + ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "Client Input data len [%i]", len); if (rv != APR_SUCCESS) { char errmsg[256]=""; apr_strerror(rv, errmsg, sizeof(errmsg)); child_stopped_reading = 1; - ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "Child stopped reading [%s]", errmsg); + ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "SkySQL Gateway stops reading [%s]", errmsg); } //////////////////////////////////////////////////////// // current data is copied into a pool allocated buffer @@ -1015,7 +1043,8 @@ static int skysql_process_connection(conn_rec *c) { } // check the mysql thread id, for pre-openend the ti is in conf->tid - ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "Serving Client with MySQL Thread ID [%lu]", conn->tid); + ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "Serving Client with Read MySQL Thread ID [%lu]", conn->tid); + ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "Serving Client with Write MySQL Thread ID [%lu]", write_conn->tid); mysql_command = query_from_client[4]; @@ -1023,40 +1052,47 @@ static int skysql_process_connection(conn_rec *c) { // now processing the mysql_command ///////////////////////////////////// - ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "Client Input command [%x]", mysql_command); - update_gateway_child_status(c->sbh, SERVER_BUSY_KEEPALIVE, c, NULL, apr_psprintf(pool, "GATEWAY: MYSQL loop Command [%x], DB [%s]", mysql_command, selected_dbname)); switch (mysql_command) { case 0x0e : - ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "COM_PING"); + ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "Protocol Command is COM_PING"); // reponse sent directly to the client // no ping to backend, for now skysql_send_ok(c, pool, 1, 0, NULL); break; case 0x04 : - ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "COM_FIELD_LIST", query_from_client+5); + ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "Protocol Command is COM_FIELD_LIST"); mysql_pass_packet(conn, query_from_client, query_from_client_len); mysql_receive_packet(c, pool, conn); - //skysql_send_ok(c, pool, 1, 0, NULL); break; case 0x1b : - ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "COM_SET_OPTION"); + ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "Protocol Command is COM_SET_OPTION"); skysql_send_eof(c, pool, 1); break; case 0x0d : - ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "COM_DEBUG"); + ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "Protocol Command is COM_DEBUG"); skysql_send_ok(c, pool, 1, 0, NULL); break; case 0x03 : - ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "COM_QUERY"); - skygateway_query_result(c, pool, conn, query_from_client+5); - //skysql_send_ok(c, pool, 1, 0, NULL); + ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "Protocol Command is COM_QUERY, Read/write Splitting is enabled"); + /////////////////////////////// + //read/write splitting logic + // passing: actual backend server config, the sql statement, protocol_command, actual_slave + //////////////////////////////// + query_logic = query_routing(config_area, query_from_client+5, 0x03, curr_s); + if (query_logic == SKYSQL_READ) { + // sql to the slave + skygateway_query_result(c, pool, conn, query_from_client+5); + } else { + // sql to the master + skygateway_query_result(c, pool, write_conn, query_from_client+5); + } break; case 0x16 : ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "COM_PREPARE"); @@ -1092,6 +1128,10 @@ static int skysql_process_connection(conn_rec *c) { mysql_close(&conn); ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "MYSQL_conn is NULL? %i", conn == NULL ? 1 : 0); } + if (!conf->pool_enabled) { + mysql_close(&write_conn); + ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "MYSQL_conn is NULL? %i", write_conn == NULL ? 1 : 0); + } break; case 0x11 : strcpy(i_username, query_from_client+5); @@ -1136,10 +1176,15 @@ static int skysql_process_connection(conn_rec *c) { if (conn != NULL) { if (!conf->pool_enabled) { - ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, ">> opened connection found!, close it with COM_QUIT"); + ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, ">> READ opened connection found!, close it with COM_QUIT"); mysql_close(&conn); - } + } + if (!conf->pool_enabled) { + ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, ">> WRITE opened connection found!, close it with COM_QUIT"); + + mysql_close(&write_conn); + } } // hey, it was okay to handle the protocol connectioni, is thereanything else to do? @@ -1154,18 +1199,33 @@ static int skysql_process_connection(conn_rec *c) { ///////////////////////////////// // The sample content handler // Only with HTTP protocol -// so it's useless now +// now only configuration commands, then // will be useful with JSON //////////////////////////////// static int skysql_handler(request_rec *r) { + skysql_server_conf *conf; + if (strcmp(r->handler, "skysql")) { return DECLINED; } + r->content_type = "text/html"; if (!r->header_only) - ap_rputs("The sample page from mod_skysql.c\n", r); + ap_rputs("SkySQL Gateway HTTP Control\n", r); + + if (r->args != NULL) { + + if (strstr(r->args, "show=1")) { + ap_rprintf(r, "Configuration M/S is [%s]", config_area); + } else if (strstr(r->args, "update=")) { + strcpy(config_area, r->args+7); + ap_rputs("Configuration M/S updated\n", r); + } + + ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, r->server, "Global M/S [%s]", config_area); + } return OK; } @@ -1176,14 +1236,51 @@ static int skysql_handler(request_rec *r) static int skysql_init_module(apr_pool_t *p, apr_pool_t *plog, apr_pool_t *ptemp, server_rec *base_server) { server_rec *s; + int fd_mapped = -1; + const char *userdata_key = "skysql_init_module"; + void *data = NULL; + skysql_server_conf *conf; s = base_server; -/* - do initialization here -*/ + + /* Handle first server startup */ + apr_pool_userdata_get(&data, userdata_key, s->process->pool); + if (data == NULL) { + apr_pool_userdata_set((const void *)1, userdata_key, apr_pool_cleanup_null, s->process->pool); + return OK; + } + + conf = (skysql_server_conf *)ap_get_module_config(s->module_config, &skysql_module); + + /* shared memory for this apache server */ + if ( (fd_mapped = open ("/dev/zero", O_RDWR) ) < 0) { + fprintf(stderr,"Errore in mem_mapped: %i, %s\n",errno,strerror(errno)); + return 500; + } else { + if ( (config_area = (unsigned char *)mmap(0, 40000, PROT_READ | PROT_WRITE, MAP_SHARED, fd_mapped, 0)) == (unsigned char *) -1) { + ap_log_error(APLOG_MARK, APLOG_INFO, 0, s, "SKYSQL Init: mem_mapped error [%i], [%s]", errno, strerror(errno)); + } else + ap_log_error(APLOG_MARK, APLOG_INFO, 0, s, "mem_mapped OK: [%i] bytes from [%x]", 40000, config_area); + close(fd_mapped); + } + + // the following code is for the virtual hosts setup!!! + /* + for (; s; s = s->next) { + if (find_config != APR_SUCCESS) { + return HTTP_INTERNAL_SERVER_ERROR; + } + } + */ + if (conf->server_list != NULL) { + strcpy(config_area, conf->server_list); + } else { + strcpy(config_area, "127.0.0.1:3307,127.0.0.1:3306,192.168.57.1:3306,START:9090_NULL:config"); + } + ap_log_error(APLOG_MARK, APLOG_INFO, 0, s, "SKYSQL Init: Internal structure done"); ap_log_error(APLOG_MARK, APLOG_INFO, 0, s, "SKYSQL Init: ext file ver is [%i]", skysql_ext_file_ver()); - + ap_log_error(APLOG_MARK, APLOG_INFO, 0, s, "SKYSQL Init: DbServers [%s], enabled [%i]", config_area, conf->protocol_enabled); return OK; } @@ -1218,14 +1315,14 @@ static void skysql_child_init(apr_pool_t *p, server_rec *s) { return ; } else { conf->mysql_tid = conf->conn->tid; - ap_log_error(APLOG_MARK, APLOG_ERR, 0, s, "PID %li SkySQL Child Init & Open connection TID %lu to backend", getpid(), conf->mysql_tid); + //ap_log_error(APLOG_MARK, APLOG_ERR, 0, s, "PID %li SkySQL Child Init & Open connection TID %lu to backend", getpid(), conf->mysql_tid); } // structure deallocation & connection close - apr_pool_cleanup_register(p, conf->conn, child_mysql_close, child_mysql_close); + apr_pool_cleanup_register(p, conf->conn, (apr_status_t (*)(void *)) child_mysql_close, (apr_status_t (*)(void *)) child_mysql_close); } else { - ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, s, "Generic init flags %i, %i, Skip Protocol Setup & Skip database connection", conf->protocol_enabled, conf->pool_enabled); + //ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, s, "Generic init flags %i, %i, Skip Protocol Setup & Skip database connection", conf->protocol_enabled, conf->pool_enabled); } // next virtual host .. @@ -1310,19 +1407,36 @@ static const char *skysql_single_db_resource(cmd_parms *cmd, void *dconf, const apr_hash_set(conf->resources, apr_pstrdup(cmd->pool, a1), APR_HASH_KEY_STRING, newresource); // creare un contenitore, table??? da agganciare con la key a1 e value a2 - fprintf(stderr, "Config Resource %s with %i servers, [%s]\n", a1, newresource->nshards, newresource->server_list); + fprintf(stderr, "Config Resource [%s] with %i servers, [%s]\n", a1, newresource->nshards, newresource->server_list); fflush(stderr); return NULL; } +static const char *skysql_server_list(cmd_parms *cmd, void *dconf, const char *a1, const char *a2) { + char *ptr_port = NULL; + char *ptr_db = NULL; + char *ptr_host = NULL; + char *ptr_list = NULL; + + skysql_server_conf *conf = ap_get_module_config(cmd->server->module_config, &skysql_module); + + ptr_db = strchr(a2, ';'); + conf->server_list = apr_pstrndup(cmd->pool, a2, ptr_db-a2); + + fprintf(stderr, "Config DBResource is [%s]\n", conf->server_list); + fflush(stderr); + + return NULL; +} ////////////////////////////// // commands implemeted here // ////////////////////////////// static const command_rec skysql_cmds[] = { AP_INIT_FLAG("SkySQLProtocol", skysql_protocol_enable, NULL, RSRC_CONF, "Run an MYSQL protocol on this host"), AP_INIT_FLAG("SkySQLPool", skysql_pool_enable, NULL, RSRC_CONF, "SKYSQL backend servers pool"), - AP_INIT_TAKE2("SkySQLSingleDBbresource", skysql_single_db_resource, NULL, OR_FILEINFO, "a single db resource name"), + //AP_INIT_TAKE2("SkySQLSingleDBbresource", skysql_single_db_resource, NULL, OR_FILEINFO, "a single db resource name"), + AP_INIT_TAKE2("SkySQLSingleDBbresource", skysql_server_list, NULL, OR_FILEINFO, "a single db resource name"), AP_INIT_TAKE1("SkySQLTimeout", skysql_loop_timeout, NULL, OR_FILEINFO, "MYSQL protocol loop timeout"), // SkySQLMaxQueryPerConnection {NULL} diff --git a/protocol_1.0/skysql_backend.c b/protocol_1.0/skysql_backend.c index 4d66e768a..53abfc150 100644 --- a/protocol_1.0/skysql_backend.c +++ b/protocol_1.0/skysql_backend.c @@ -1,29 +1,48 @@ //////////////////////////////////////// // SKYSQL Backend -// By Massimiliano Pinto 2012 +// By Massimiliano Pinto 2012/2013 // SkySQL AB //////////////////////////////////////// #include "skysql_gw.h" -#define SKYSQL_READ 0 -#define SKYSQL_WRITE 1 - -int skysql_query_is_select(const char *query) { - - return SKYSQL_READ; -} - int skysql_ext_file_ver(void) { int ret = 13; return ret; } -int select_random_slave_server(int nslaves) { - int random_balancer = (int) ((nslaves) * (rand() / (RAND_MAX + 1.0))); +////////////////////////////////////////////////////////////// +// The function takes the server list, +// find the total server number +// and return a random selection for slaves (from total -1) +////////////////////////////////////////////////////////////// +int select_random_slave_server(const char *server_list, int *num_slaves) { + int nslaves = 0; + int random_balancer = 0; + char *p = (char *)server_list; + while( (p = strchr(p, ',')) != NULL) { + p++; + nslaves++; + } + + memcpy(num_slaves, &nslaves, sizeof(int)); + + if (nslaves == 1) { + return 1; + } + + // random selection + random_balancer = (int) ((nslaves+1) * (rand() / (RAND_MAX + 1.0))); + return random_balancer; } +/////////////////////////////////////////////////////////////// +// This takes a server from the list +// index 0 is always the Master +// the others refer to the salve, +// the slave number comes from: select_random_slave_server() +/////////////////////////////////////////////////////////////// int get_server_from_list(char **selected_host, int *selected_port, char *server_list, int num, apr_pool_t *p) { int ret = -1; int curr_srv = 0; @@ -42,6 +61,7 @@ int get_server_from_list(char **selected_host, int *selected_port, char *server_ next = server_list; while (curr_srv < num) { + tmp = strchr(next, ','); if (tmp != NULL) { curr_srv++; @@ -52,7 +72,10 @@ int get_server_from_list(char **selected_host, int *selected_port, char *server_ if (curr_srv == num) { port = atoi(strchr(next, ':') + 1); + memcpy(selected_port, &port, sizeof(port)); + + // the host string must be allocated in the memory pool! *selected_host = apr_pstrndup(p, next, strchr(next, ':') - next); ret = 0; @@ -62,3 +85,41 @@ int get_server_from_list(char **selected_host, int *selected_port, char *server_ return ret; } + + +////////////////////////////////////////////// +// This funcion take the master from the list +// The index is always 0 +////////////////////////////////////////////// +int get_master_from_list(char **selected_host, int *selected_port, char *server_list, apr_pool_t *p) { + int ret = -1; + int curr_srv = 0; + char *next = NULL; + char *tmp = NULL; + int port; + + port = atoi(strchr(server_list, ':') + 1), sizeof(port); + memcpy(selected_port, &port, sizeof(int)); + + // the host string must be allocated in the memory pool! + *selected_host = apr_pstrndup(p, server_list, strchr(server_list, ':') - server_list); + + return 1; +} + +/////////////////////////////////////// +// Query Routing basic implementation +/////////////////////////////////////// + +int query_routing(const char *server_list, const char *sql_command, int procotol_command, int current_slave) { + + if (strstr(sql_command, "select ")) { + // to the slave + return SKYSQL_READ; + } else { + // to the master + return SKYSQL_WRITE; + } +} + +////////////////// diff --git a/protocol_1.0/skysql_client.h b/protocol_1.0/skysql_client.h index 36bb1b9d2..cf1155f57 100644 --- a/protocol_1.0/skysql_client.h +++ b/protocol_1.0/skysql_client.h @@ -1,6 +1,6 @@ //////////////////////////////////////// // SKYSQL header file -// By Massimiliano Pinto 2012 +// By Massimiliano Pinto 2012/2013 // SkySQL AB //////////////////////////////////////// @@ -137,7 +137,8 @@ typedef enum #define MAX_CHUNK SMALL_CHUNK * 8 * 4 #define ToHex(Y) (Y>='0'&&Y<='9'?Y-'0':Y-'A'+10) -//#define MYSQL_CONN_DEBUG +#define MYSQL_CONN_DEBUG +#undef MYSQL_CONN_DEBUG typedef struct { apr_socket_t *socket; diff --git a/protocol_1.0/skysql_gw.h b/protocol_1.0/skysql_gw.h index 5e088929d..9f26a5007 100644 --- a/protocol_1.0/skysql_gw.h +++ b/protocol_1.0/skysql_gw.h @@ -1,6 +1,6 @@ //////////////////////////////////////// // SKYSQL header file -// By Massimiliano Pinto 2012 +// By Massimiliano Pinto 2012/2013 // SkySQL AB //////////////////////////////////////// @@ -38,11 +38,17 @@ // getpid #include +// mapped I/O +#include + #include "skysql_client.h" #define SKYSQL_GATEWAY_VERSION "0.0.1" #define SKYSQL_VERSION "5.5.22-SKY-1.6.5" +#define SKYSQL_READ 0 +#define SKYSQL_WRITE 1 + #define HTTP_WELCOME_MESSAGE "HTTP/1.1 200 OK\r\nConnection: Keep-Alive\r\nContent-Type: text/plain\r\n\r\nSKYSQL Gateway " SKYSQL_GATEWAY_VERSION #define SKYSQL_LISTENER_VERSION "MySQL Community Server (GPL)" @@ -55,6 +61,8 @@ module AP_MODULE_DECLARE_DATA skysql_module; +static unsigned char *config_area=NULL; + //const int SKY_SQL_MAX_PACKET_LEN = 0xffffffL; typedef struct { @@ -64,6 +72,7 @@ typedef struct { int protocol_enabled; int pool_enabled; char backend_servers[2][128]; + char *server_list; apr_hash_t *resources; int loop_timeout; } skysql_server_conf; @@ -96,6 +105,11 @@ typedef struct { int compress; } mysql_driver_details; +typedef struct { + int num; + char *list; +} backend_list; + int skysql_ext_file_ver(); int skysql_query_is_select(const char *query); apr_status_t skysql_read_client_autentication(conn_rec *c, apr_pool_t *pool, uint8_t *scramble, int scramble_len, skysql_client_auth *mysql_client_data, uint8_t *stage1_hash); @@ -105,10 +119,25 @@ apr_status_t skysql_send_error (conn_rec *c, uint8_t packet_number, MYSQL_conn * apr_status_t skysql_send_ok(conn_rec *c, apr_pool_t *p, uint8_t packet_number, uint8_t in_affected_rows, const char* skysql_message); apr_status_t skysql_send_eof(conn_rec *c, apr_pool_t *p, uint8_t packet_number); apr_status_t skysql_send_result(conn_rec *c, uint8_t *data, uint8_t len); -int select_random_slave_server(int nslaves); +int select_random_slave_server(const char *server_listi, int *num_slaves); apr_status_t gateway_send_error (conn_rec *c, apr_pool_t *p, uint8_t packet_number); apr_status_t gateway_reply_data(conn_rec *c, apr_pool_t *pool, void *data, int len); char *gateway_find_user_password_sha1(char *username, void *repository, conn_rec *c, apr_pool_t *p); void skysql_sha1_str(const uint8_t *in, int in_len, uint8_t *out); int skygateway_query_result(conn_rec *c, apr_pool_t *p, MYSQL_conn *conn, const char *query); -//apr_status_t skysql_change_user(conn_rec *c, apr_pool_t *p, char *username, char *database, MYSQL *conn, uint8_t *stage1_hash); +char *bin2hex(char *out, const uint8_t *in, unsigned int len); +void skysql_sha1_2_str(const uint8_t *in, int in_len, const uint8_t *in2, int in2_len, uint8_t *out); +void skysql_str_xor(char *output, const uint8_t *input1, const uint8_t *input2, unsigned int len); +int get_server_from_list(char **selected_host, int *selected_port, char *server_list, int num, apr_pool_t *p); +int get_master_from_list(char **selected_host, int *selected_port, char *server_list, apr_pool_t *p); +int mysql_pass_packet(MYSQL_conn *conn, const char *command, int len); +int mysql_receive_packet(conn_rec *c, apr_pool_t *p, MYSQL_conn *conn); +int skygateway_statement_prepare_result(conn_rec *c, apr_pool_t *p, MYSQL_conn *conn, const char *query, int len); +int skygateway_statement_execute_result(conn_rec *c, apr_pool_t *p, MYSQL_conn *conn, const char *query, int len); +int mysql_send_command(MYSQL_conn *conn, const char *command, int cmd, int len); +apr_status_t skysql_change_user(conn_rec *c, apr_pool_t *p, char *username, char *database, MYSQL_conn *conn, uint8_t *stage1_hash); +int query_routing(const char *server_list, const char *sql_command, int procotol_command, int current_slave); +unsigned int mysql_errno(MYSQL_conn *mysql); +const char *mysql_error(MYSQL_conn *mysql); +const char *mysql_sqlstate(MYSQL_conn *mysql); +int mysql_query(MYSQL_conn *conn, const char *query); diff --git a/protocol_1.0/skysql_utils.c b/protocol_1.0/skysql_utils.c index f41fa8dee..9eaf2c027 100644 --- a/protocol_1.0/skysql_utils.c +++ b/protocol_1.0/skysql_utils.c @@ -1,6 +1,6 @@ //////////////////////////////////////// // SKYSQL Utils -// By Massimiliano Pinto 2012 +// By Massimiliano Pinto 2012/2013 // SkySQL AB //////////////////////////////////////// @@ -143,10 +143,10 @@ int skysql_check_scramble(conn_rec *c, apr_pool_t *p, uint8_t *token, unsigned i uint8_t check_hash[APR_SHA1_DIGESTSIZE]; char hex_double_sha1[2 * APR_SHA1_DIGESTSIZE + 1]=""; - uint8_t *password = gateway_find_user_password_sha1("pippo", NULL, c, p); + uint8_t *password = gateway_find_user_password_sha1(username, NULL, c, p); bin2hex(hex_double_sha1, password, APR_SHA1_DIGESTSIZE); - ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "stored hex(SHA1(SHA1(password))) [%s]", hex_double_sha1); + ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "The Gateway stored hex(SHA1(SHA1(password))) for \"%s\" [%s]", username, hex_double_sha1); // possible, now skipped /* @@ -168,6 +168,12 @@ int skysql_check_scramble(conn_rec *c, apr_pool_t *p, uint8_t *token, unsigned i ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "SHA1 di SHA1(client password) [%s]", check_hash); + if (1) { + char inpass[100]=""; + bin2hex(inpass, check_hash, 20); + ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "The CLIENT hex(SHA1(SHA1(password))) for \"%s\" [%s]", username, inpass); + } + return memcmp(password, check_hash, APR_SHA1_DIGESTSIZE); } @@ -846,6 +852,9 @@ int skygateway_query_result(conn_rec *c, apr_pool_t *p, MYSQL_conn *conn, const apr_status_t poll_rv; int is_eof = 0; + ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "SKYSQLGW is sending query to backend [%lu] ...", conn->tid); + + // send the query to the backend query_ret = mysql_query(conn, query); if (query_ret) { @@ -855,16 +864,14 @@ int skygateway_query_result(conn_rec *c, apr_pool_t *p, MYSQL_conn *conn, const return 1; } - ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "SKYSQLGW is sending result set ..."); + //poll_rv = apr_pollset_create(&pset, 1, p, 0); - poll_rv = apr_pollset_create(&pset, 1, p, 0); - - pfd.p = p; - pfd.desc_type = APR_POLL_SOCKET; - pfd.reqevents = APR_POLLIN; - pfd.rtnevents = APR_POLLIN; - pfd.desc.s = conn->socket; - pfd.client_data = NULL; + //pfd.p = p; + //pfd.desc_type = APR_POLL_SOCKET; + //pfd.reqevents = APR_POLLIN; + //pfd.rtnevents = APR_POLLIN; + //pfd.desc.s = conn->socket; + //pfd.client_data = NULL; //rv = apr_pollset_add(pset, &pfd); @@ -872,6 +879,9 @@ int skygateway_query_result(conn_rec *c, apr_pool_t *p, MYSQL_conn *conn, const apr_socket_timeout_set(conn->socket, 100000000); + // read query resut from backend + ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "SKYSQLGW is receiving query result from backend ..."); + while(1) { char errmesg_p[1000]=""; bytes=MAX_CHUNK; @@ -884,8 +894,6 @@ int skygateway_query_result(conn_rec *c, apr_pool_t *p, MYSQL_conn *conn, const //fprintf(stderr, "wait Errore in recv, rv is %i, [%s]\n", rv, errmesg_p); //fflush(stderr); - ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "SKYSQLGW is receiving ..."); - //apr_socket_atreadeof(conn->socket, &is_eof); rv = apr_socket_recv(conn->socket, buffer, &bytes); @@ -920,13 +928,13 @@ int skygateway_query_result(conn_rec *c, apr_pool_t *p, MYSQL_conn *conn, const apr_brigade_destroy(bb1); - ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "SKYSQLGW receive, brigade sent with %li bytes", bytes); + ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "SKYSQLGW receive, brigade sent to the client with %li bytes", bytes); cycle++; if (bytes < MAX_CHUNK) { - ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "SKYSQLGW receive: less bytes than buffer here, Return from query result: total bytes %lu in %i", tot_bytes, cycle); + ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "SKYSQLGW receive: less bytes than buffer here, Query Result: total bytes %lu in %i", tot_bytes, cycle); return 0; } @@ -1339,3 +1347,15 @@ int mysql_receive_packet(conn_rec *c, apr_pool_t *p, MYSQL_conn *conn) { return 0; } + + +backend_list select_backend_servers() { + backend_list l; + + memset(&l, '\0', sizeof(backend_list)); + + l.num = 2; + l.list = "127.0.0.1:3307,127.0.0.1:3306,xxxx:11"; + + return l; +}