New version

This commit is contained in:
Massimiliano Pinto 2013-04-09 16:05:47 +02:00
parent 92d3a89fcb
commit 9a8c9ab403
5 changed files with 321 additions and 96 deletions

View File

@ -1,6 +1,6 @@
//////////////////////////////////
// SKYSQL GATEWAY main module
// By Massimiliano Pinto 2012
// By Massimiliano Pinto 2012/2013
// SkySQL AB
//////////////////////////////////
//
@ -487,11 +487,13 @@ static int mysql_connect(char *host, int port, char *dbname, char *user, char *p
}
if (passwd != NULL) {
uint8_t hash1[APR_SHA1_DIGESTSIZE];
uint8_t hash2[APR_SHA1_DIGESTSIZE];
uint8_t new_sha[APR_SHA1_DIGESTSIZE];
uint8_t hash1[APR_SHA1_DIGESTSIZE]="";
uint8_t hash2[APR_SHA1_DIGESTSIZE]="";
uint8_t new_sha[APR_SHA1_DIGESTSIZE]="";
skysql_sha1_str(passwd, strlen(passwd), hash1);
//skysql_sha1_str("xxxx", strlen("xxxx"), hash1);
memcpy(hash1, passwd, 20);
skysql_sha1_str(hash1, 20, hash2);
bin2hex(dbpass, hash2, 20);
skysql_sha1_2_str(scramble, 20, hash2, 20, new_sha);
@ -509,7 +511,6 @@ static int mysql_connect(char *host, int port, char *dbname, char *user, char *p
fprintf(stderr, "\n]\n");
fflush(stderr);
#endif
}
if (dbname == NULL) {
@ -700,6 +701,7 @@ static int skysql_process_connection(conn_rec *c) {
skysql_client_auth *mysql_client_data = NULL;
mysql_driver_details *mysql_driver = NULL;
MYSQL_conn *conn = NULL;
MYSQL_conn *write_conn = NULL;
apr_pool_t *pool = NULL;
int max_queries_per_connection = 0;
uint8_t mysql_command = 0;
@ -712,11 +714,20 @@ static int skysql_process_connection(conn_rec *c) {
uint8_t stage1_hash[20 +1] ="";
conn_details *find_server = NULL;
char *selected_host_m = NULL;
char *selected_host = NULL;
char *selected_host_temp = NULL;
char *selected_dbname = NULL;
int selected_shard = 0;
int selected_port = 0;
int selected_port_m = 0;
backend_list l;
// current slave
int curr_s = 1;
//server to client socket timeout
apr_interval_time_t timeout = 300000000;
/////////////////////////////////////////
@ -783,6 +794,8 @@ static int skysql_process_connection(conn_rec *c) {
rv = skysql_read_client_autentication(c, pool, scramble, scramble_len, mysql_client_data, stage1_hash);
ap_log_error(APLOG_MARK, APLOG_ERR, 0, c->base_server, "Client MySQL Stage1 Hash [%s]", stage1_hash);
// client authentication data stored
if (!rv) {
@ -833,51 +846,65 @@ static int skysql_process_connection(conn_rec *c) {
/////////////////////////
if (!conf->pool_enabled) {
curr_s = 1;
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "MySQL backend open/close");
conn = mysql_init(NULL);
write_conn = mysql_init(NULL);
if (conn == NULL) {
ap_log_error(APLOG_MARK, APLOG_ERR, 0, c->base_server, "MYSQL init Error %u: %s", 1, "No memory");
ap_log_error(APLOG_MARK, APLOG_ERR, 0, c->base_server, "MYSQL Read conn init Error %u: %s", 1, "No memory");
return 500;
}
if (write_conn == NULL) {
ap_log_error(APLOG_MARK, APLOG_ERR, 0, c->base_server, "MYSQL Write conn init Error %u: %s", 1, "No memory");
return 500;
}
// do the connect
// find config data
find_server = apr_hash_get(conf->resources, "loadbal", APR_HASH_KEY_STRING);
if (find_server != NULL) {
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "SKYSQL config find was DONE");
//switch(find_server->type)
if (find_server->nshards == 1) {
selected_port = atoi(strchr(find_server->server_list, ':') + 1);
selected_host = apr_pstrndup(c->pool, find_server->server_list, strchr(find_server->server_list, ':') - find_server->server_list);
selected_shard = 1;
} else {
selected_shard = select_random_slave_server(find_server->nshards);
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "SKYSQL config find [%i] servers", find_server->nshards);
get_server_from_list(&selected_host, &selected_port, find_server->server_list, selected_shard, c->pool);
}
} else {
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "SKYSQL config find KO: using default!");
selected_port = 3306;
selected_host = apr_pstrdup(c->pool, "127.0.0.1");
}
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "SKYSQL backend selection [%i], [%s]:[%i]", selected_shard, selected_host, selected_port);
if (mysql_client_data->database != NULL) {
selected_dbname = mysql_client_data->database;
} else {
selected_dbname = "test";
}
if (mysql_connect(selected_host, selected_port, selected_dbname, mysql_client_data->username, "pippo", conn, mysql_driver->compress) != 0) {
//if (mysql_real_connect(conn, "192.168.1.40", "root", "pippo", "test", 3306, NULL, 0) == NULL) //
ap_log_error(APLOG_MARK, APLOG_ERR, 0, c->base_server, "MYSQL Connect [%s:%i] Error %u: %s", selected_host, selected_port, mysql_errno(conn), mysql_error(conn));
return 500;
// backend selection: Master and Slave load balancing (if more than one slave in config)
l.list = config_area;
curr_s = select_random_slave_server(l.list, &l.num);
if (l.num > 1) {
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "SkySQL M/S RunTime config [%s], RoundRobin selected slave is [%i]", config_area, curr_s);
} else {
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "SkySQL RunTime Opened connection to backend");
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "Backend Server TID %i, scamble_buf [%5s]", conn->tid, conn->scramble);
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "SkySQL M/S RunTime config [%s]", config_area);
}
/////////////////////////////////
// find a SLAVE in the list
// index > 0 is for slave
/////////////////////////////////
get_server_from_list(&selected_host_temp, &selected_port, config_area, curr_s, c->pool);
selected_host=apr_pstrdup(c->pool, selected_host_temp);
if (mysql_connect(selected_host, selected_port, selected_dbname, mysql_client_data->username, stage1_hash, conn, mysql_driver->compress) != 0) {
ap_log_error(APLOG_MARK, APLOG_ERR, 0, c->base_server, "MYSQL Read Connect [%s:%i] Error %u: %s", selected_host, selected_port, mysql_errno(conn), mysql_error(conn));
return 500;
} else {
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "SkySQL READ RunTime Opened connection to backend [%s:%i:%s]", selected_host, selected_port, selected_dbname);
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "READ Backend Server TID %i, scamble_buf [%5s]", conn->tid, conn->scramble);
}
//////////////////////////////////
// find the MASTER in the list
//////////////////////////////////
get_master_from_list(&selected_host_temp, &selected_port_m, config_area, c->pool);
selected_host_m=apr_pstrdup(c->pool, selected_host_temp);
if (mysql_connect(selected_host_m, selected_port_m, selected_dbname, mysql_client_data->username, stage1_hash, write_conn, mysql_driver->compress) != 0) {
ap_log_error(APLOG_MARK, APLOG_ERR, 0, c->base_server, "MYSQL Write Connect [%s:%i] Error %u: %s", selected_host_m, selected_port_m, mysql_errno(write_conn), mysql_error(write_conn));
return 500;
} else {
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "SkySQL Write RunTime Opened connection to backend [%s:%i:%s]", selected_host_m, selected_port_m, selected_dbname);
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "Write Backend Server TID %i, scamble_buf [%5s]", write_conn->tid, write_conn->scramble);
}
} else {
@ -902,6 +929,7 @@ static int skysql_process_connection(conn_rec *c) {
while(1) {
char i_username[100]="";
int query_logic=0;
//////////////////////////////////////////////////////////////
// the new pool is allocated on c->pool
// this new pool is the right one for the while(1) main loop
@ -970,13 +998,13 @@ static int skysql_process_connection(conn_rec *c) {
///////////////////////////
rv = apr_bucket_read(bucket, &data, &len, APR_BLOCK_READ);
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "Input data with len [%i]", len);
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "Client Input data len [%i]", len);
if (rv != APR_SUCCESS) {
char errmsg[256]="";
apr_strerror(rv, errmsg, sizeof(errmsg));
child_stopped_reading = 1;
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "Child stopped reading [%s]", errmsg);
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "SkySQL Gateway stops reading [%s]", errmsg);
}
////////////////////////////////////////////////////////
// current data is copied into a pool allocated buffer
@ -1015,7 +1043,8 @@ static int skysql_process_connection(conn_rec *c) {
}
// check the mysql thread id, for pre-openend the ti is in conf->tid
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "Serving Client with MySQL Thread ID [%lu]", conn->tid);
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "Serving Client with Read MySQL Thread ID [%lu]", conn->tid);
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "Serving Client with Write MySQL Thread ID [%lu]", write_conn->tid);
mysql_command = query_from_client[4];
@ -1023,40 +1052,47 @@ static int skysql_process_connection(conn_rec *c) {
// now processing the mysql_command
/////////////////////////////////////
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "Client Input command [%x]", mysql_command);
update_gateway_child_status(c->sbh, SERVER_BUSY_KEEPALIVE, c, NULL, apr_psprintf(pool, "GATEWAY: MYSQL loop Command [%x], DB [%s]", mysql_command, selected_dbname));
switch (mysql_command) {
case 0x0e :
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "COM_PING");
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "Protocol Command is COM_PING");
// reponse sent directly to the client
// no ping to backend, for now
skysql_send_ok(c, pool, 1, 0, NULL);
break;
case 0x04 :
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "COM_FIELD_LIST", query_from_client+5);
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "Protocol Command is COM_FIELD_LIST");
mysql_pass_packet(conn, query_from_client, query_from_client_len);
mysql_receive_packet(c, pool, conn);
//skysql_send_ok(c, pool, 1, 0, NULL);
break;
case 0x1b :
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "COM_SET_OPTION");
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "Protocol Command is COM_SET_OPTION");
skysql_send_eof(c, pool, 1);
break;
case 0x0d :
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "COM_DEBUG");
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "Protocol Command is COM_DEBUG");
skysql_send_ok(c, pool, 1, 0, NULL);
break;
case 0x03 :
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "COM_QUERY");
skygateway_query_result(c, pool, conn, query_from_client+5);
//skysql_send_ok(c, pool, 1, 0, NULL);
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "Protocol Command is COM_QUERY, Read/write Splitting is enabled");
///////////////////////////////
//read/write splitting logic
// passing: actual backend server config, the sql statement, protocol_command, actual_slave
////////////////////////////////
query_logic = query_routing(config_area, query_from_client+5, 0x03, curr_s);
if (query_logic == SKYSQL_READ) {
// sql to the slave
skygateway_query_result(c, pool, conn, query_from_client+5);
} else {
// sql to the master
skygateway_query_result(c, pool, write_conn, query_from_client+5);
}
break;
case 0x16 :
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "COM_PREPARE");
@ -1092,6 +1128,10 @@ static int skysql_process_connection(conn_rec *c) {
mysql_close(&conn);
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "MYSQL_conn is NULL? %i", conn == NULL ? 1 : 0);
}
if (!conf->pool_enabled) {
mysql_close(&write_conn);
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "MYSQL_conn is NULL? %i", write_conn == NULL ? 1 : 0);
}
break;
case 0x11 :
strcpy(i_username, query_from_client+5);
@ -1136,10 +1176,15 @@ static int skysql_process_connection(conn_rec *c) {
if (conn != NULL) {
if (!conf->pool_enabled) {
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, ">> opened connection found!, close it with COM_QUIT");
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, ">> READ opened connection found!, close it with COM_QUIT");
mysql_close(&conn);
}
}
if (!conf->pool_enabled) {
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, ">> WRITE opened connection found!, close it with COM_QUIT");
mysql_close(&write_conn);
}
}
// hey, it was okay to handle the protocol connectioni, is thereanything else to do?
@ -1154,18 +1199,33 @@ static int skysql_process_connection(conn_rec *c) {
/////////////////////////////////
// The sample content handler
// Only with HTTP protocol
// so it's useless now
// now only configuration commands, then
// will be useful with JSON
////////////////////////////////
static int skysql_handler(request_rec *r)
{
skysql_server_conf *conf;
if (strcmp(r->handler, "skysql")) {
return DECLINED;
}
r->content_type = "text/html";
if (!r->header_only)
ap_rputs("The sample page from mod_skysql.c\n", r);
ap_rputs("SkySQL Gateway HTTP Control\n", r);
if (r->args != NULL) {
if (strstr(r->args, "show=1")) {
ap_rprintf(r, "Configuration M/S is [%s]", config_area);
} else if (strstr(r->args, "update=")) {
strcpy(config_area, r->args+7);
ap_rputs("Configuration M/S updated\n", r);
}
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, r->server, "Global M/S [%s]", config_area);
}
return OK;
}
@ -1176,14 +1236,51 @@ static int skysql_handler(request_rec *r)
static int skysql_init_module(apr_pool_t *p, apr_pool_t *plog, apr_pool_t *ptemp, server_rec *base_server) {
server_rec *s;
int fd_mapped = -1;
const char *userdata_key = "skysql_init_module";
void *data = NULL;
skysql_server_conf *conf;
s = base_server;
/*
do initialization here
*/
/* Handle first server startup */
apr_pool_userdata_get(&data, userdata_key, s->process->pool);
if (data == NULL) {
apr_pool_userdata_set((const void *)1, userdata_key, apr_pool_cleanup_null, s->process->pool);
return OK;
}
conf = (skysql_server_conf *)ap_get_module_config(s->module_config, &skysql_module);
/* shared memory for this apache server */
if ( (fd_mapped = open ("/dev/zero", O_RDWR) ) < 0) {
fprintf(stderr,"Errore in mem_mapped: %i, %s\n",errno,strerror(errno));
return 500;
} else {
if ( (config_area = (unsigned char *)mmap(0, 40000, PROT_READ | PROT_WRITE, MAP_SHARED, fd_mapped, 0)) == (unsigned char *) -1) {
ap_log_error(APLOG_MARK, APLOG_INFO, 0, s, "SKYSQL Init: mem_mapped error [%i], [%s]", errno, strerror(errno));
} else
ap_log_error(APLOG_MARK, APLOG_INFO, 0, s, "mem_mapped OK: [%i] bytes from [%x]", 40000, config_area);
close(fd_mapped);
}
// the following code is for the virtual hosts setup!!!
/*
for (; s; s = s->next) {
if (find_config != APR_SUCCESS) {
return HTTP_INTERNAL_SERVER_ERROR;
}
}
*/
if (conf->server_list != NULL) {
strcpy(config_area, conf->server_list);
} else {
strcpy(config_area, "127.0.0.1:3307,127.0.0.1:3306,192.168.57.1:3306,START:9090_NULL:config");
}
ap_log_error(APLOG_MARK, APLOG_INFO, 0, s, "SKYSQL Init: Internal structure done");
ap_log_error(APLOG_MARK, APLOG_INFO, 0, s, "SKYSQL Init: ext file ver is [%i]", skysql_ext_file_ver());
ap_log_error(APLOG_MARK, APLOG_INFO, 0, s, "SKYSQL Init: DbServers [%s], enabled [%i]", config_area, conf->protocol_enabled);
return OK;
}
@ -1218,14 +1315,14 @@ static void skysql_child_init(apr_pool_t *p, server_rec *s) {
return ;
} else {
conf->mysql_tid = conf->conn->tid;
ap_log_error(APLOG_MARK, APLOG_ERR, 0, s, "PID %li SkySQL Child Init & Open connection TID %lu to backend", getpid(), conf->mysql_tid);
//ap_log_error(APLOG_MARK, APLOG_ERR, 0, s, "PID %li SkySQL Child Init & Open connection TID %lu to backend", getpid(), conf->mysql_tid);
}
// structure deallocation & connection close
apr_pool_cleanup_register(p, conf->conn, child_mysql_close, child_mysql_close);
apr_pool_cleanup_register(p, conf->conn, (apr_status_t (*)(void *)) child_mysql_close, (apr_status_t (*)(void *)) child_mysql_close);
} else {
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, s, "Generic init flags %i, %i, Skip Protocol Setup & Skip database connection", conf->protocol_enabled, conf->pool_enabled);
//ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, s, "Generic init flags %i, %i, Skip Protocol Setup & Skip database connection", conf->protocol_enabled, conf->pool_enabled);
}
// next virtual host ..
@ -1310,19 +1407,36 @@ static const char *skysql_single_db_resource(cmd_parms *cmd, void *dconf, const
apr_hash_set(conf->resources, apr_pstrdup(cmd->pool, a1), APR_HASH_KEY_STRING, newresource);
// creare un contenitore, table??? da agganciare con la key a1 e value a2
fprintf(stderr, "Config Resource %s with %i servers, [%s]\n", a1, newresource->nshards, newresource->server_list);
fprintf(stderr, "Config Resource [%s] with %i servers, [%s]\n", a1, newresource->nshards, newresource->server_list);
fflush(stderr);
return NULL;
}
static const char *skysql_server_list(cmd_parms *cmd, void *dconf, const char *a1, const char *a2) {
char *ptr_port = NULL;
char *ptr_db = NULL;
char *ptr_host = NULL;
char *ptr_list = NULL;
skysql_server_conf *conf = ap_get_module_config(cmd->server->module_config, &skysql_module);
ptr_db = strchr(a2, ';');
conf->server_list = apr_pstrndup(cmd->pool, a2, ptr_db-a2);
fprintf(stderr, "Config DBResource is [%s]\n", conf->server_list);
fflush(stderr);
return NULL;
}
//////////////////////////////
// commands implemeted here //
//////////////////////////////
static const command_rec skysql_cmds[] = {
AP_INIT_FLAG("SkySQLProtocol", skysql_protocol_enable, NULL, RSRC_CONF, "Run an MYSQL protocol on this host"),
AP_INIT_FLAG("SkySQLPool", skysql_pool_enable, NULL, RSRC_CONF, "SKYSQL backend servers pool"),
AP_INIT_TAKE2("SkySQLSingleDBbresource", skysql_single_db_resource, NULL, OR_FILEINFO, "a single db resource name"),
//AP_INIT_TAKE2("SkySQLSingleDBbresource", skysql_single_db_resource, NULL, OR_FILEINFO, "a single db resource name"),
AP_INIT_TAKE2("SkySQLSingleDBbresource", skysql_server_list, NULL, OR_FILEINFO, "a single db resource name"),
AP_INIT_TAKE1("SkySQLTimeout", skysql_loop_timeout, NULL, OR_FILEINFO, "MYSQL protocol loop timeout"),
// SkySQLMaxQueryPerConnection
{NULL}

View File

@ -1,29 +1,48 @@
////////////////////////////////////////
// SKYSQL Backend
// By Massimiliano Pinto 2012
// By Massimiliano Pinto 2012/2013
// SkySQL AB
////////////////////////////////////////
#include "skysql_gw.h"
#define SKYSQL_READ 0
#define SKYSQL_WRITE 1
int skysql_query_is_select(const char *query) {
return SKYSQL_READ;
}
int skysql_ext_file_ver(void) {
int ret = 13;
return ret;
}
int select_random_slave_server(int nslaves) {
int random_balancer = (int) ((nslaves) * (rand() / (RAND_MAX + 1.0)));
//////////////////////////////////////////////////////////////
// The function takes the server list,
// find the total server number
// and return a random selection for slaves (from total -1)
//////////////////////////////////////////////////////////////
int select_random_slave_server(const char *server_list, int *num_slaves) {
int nslaves = 0;
int random_balancer = 0;
char *p = (char *)server_list;
while( (p = strchr(p, ',')) != NULL) {
p++;
nslaves++;
}
memcpy(num_slaves, &nslaves, sizeof(int));
if (nslaves == 1) {
return 1;
}
// random selection
random_balancer = (int) ((nslaves+1) * (rand() / (RAND_MAX + 1.0)));
return random_balancer;
}
///////////////////////////////////////////////////////////////
// This takes a server from the list
// index 0 is always the Master
// the others refer to the salve,
// the slave number comes from: select_random_slave_server()
///////////////////////////////////////////////////////////////
int get_server_from_list(char **selected_host, int *selected_port, char *server_list, int num, apr_pool_t *p) {
int ret = -1;
int curr_srv = 0;
@ -42,6 +61,7 @@ int get_server_from_list(char **selected_host, int *selected_port, char *server_
next = server_list;
while (curr_srv < num) {
tmp = strchr(next, ',');
if (tmp != NULL) {
curr_srv++;
@ -52,7 +72,10 @@ int get_server_from_list(char **selected_host, int *selected_port, char *server_
if (curr_srv == num) {
port = atoi(strchr(next, ':') + 1);
memcpy(selected_port, &port, sizeof(port));
// the host string must be allocated in the memory pool!
*selected_host = apr_pstrndup(p, next, strchr(next, ':') - next);
ret = 0;
@ -62,3 +85,41 @@ int get_server_from_list(char **selected_host, int *selected_port, char *server_
return ret;
}
//////////////////////////////////////////////
// This funcion take the master from the list
// The index is always 0
//////////////////////////////////////////////
int get_master_from_list(char **selected_host, int *selected_port, char *server_list, apr_pool_t *p) {
int ret = -1;
int curr_srv = 0;
char *next = NULL;
char *tmp = NULL;
int port;
port = atoi(strchr(server_list, ':') + 1), sizeof(port);
memcpy(selected_port, &port, sizeof(int));
// the host string must be allocated in the memory pool!
*selected_host = apr_pstrndup(p, server_list, strchr(server_list, ':') - server_list);
return 1;
}
///////////////////////////////////////
// Query Routing basic implementation
///////////////////////////////////////
int query_routing(const char *server_list, const char *sql_command, int procotol_command, int current_slave) {
if (strstr(sql_command, "select ")) {
// to the slave
return SKYSQL_READ;
} else {
// to the master
return SKYSQL_WRITE;
}
}
//////////////////

View File

@ -1,6 +1,6 @@
////////////////////////////////////////
// SKYSQL header file
// By Massimiliano Pinto 2012
// By Massimiliano Pinto 2012/2013
// SkySQL AB
////////////////////////////////////////
@ -137,7 +137,8 @@ typedef enum
#define MAX_CHUNK SMALL_CHUNK * 8 * 4
#define ToHex(Y) (Y>='0'&&Y<='9'?Y-'0':Y-'A'+10)
//#define MYSQL_CONN_DEBUG
#define MYSQL_CONN_DEBUG
#undef MYSQL_CONN_DEBUG
typedef struct {
apr_socket_t *socket;

View File

@ -1,6 +1,6 @@
////////////////////////////////////////
// SKYSQL header file
// By Massimiliano Pinto 2012
// By Massimiliano Pinto 2012/2013
// SkySQL AB
////////////////////////////////////////
@ -38,11 +38,17 @@
// getpid
#include <unistd.h>
// mapped I/O
#include <sys/mman.h>
#include "skysql_client.h"
#define SKYSQL_GATEWAY_VERSION "0.0.1"
#define SKYSQL_VERSION "5.5.22-SKY-1.6.5"
#define SKYSQL_READ 0
#define SKYSQL_WRITE 1
#define HTTP_WELCOME_MESSAGE "HTTP/1.1 200 OK\r\nConnection: Keep-Alive\r\nContent-Type: text/plain\r\n\r\nSKYSQL Gateway " SKYSQL_GATEWAY_VERSION
#define SKYSQL_LISTENER_VERSION "MySQL Community Server (GPL)"
@ -55,6 +61,8 @@
module AP_MODULE_DECLARE_DATA skysql_module;
static unsigned char *config_area=NULL;
//const int SKY_SQL_MAX_PACKET_LEN = 0xffffffL;
typedef struct {
@ -64,6 +72,7 @@ typedef struct {
int protocol_enabled;
int pool_enabled;
char backend_servers[2][128];
char *server_list;
apr_hash_t *resources;
int loop_timeout;
} skysql_server_conf;
@ -96,6 +105,11 @@ typedef struct {
int compress;
} mysql_driver_details;
typedef struct {
int num;
char *list;
} backend_list;
int skysql_ext_file_ver();
int skysql_query_is_select(const char *query);
apr_status_t skysql_read_client_autentication(conn_rec *c, apr_pool_t *pool, uint8_t *scramble, int scramble_len, skysql_client_auth *mysql_client_data, uint8_t *stage1_hash);
@ -105,10 +119,25 @@ apr_status_t skysql_send_error (conn_rec *c, uint8_t packet_number, MYSQL_conn *
apr_status_t skysql_send_ok(conn_rec *c, apr_pool_t *p, uint8_t packet_number, uint8_t in_affected_rows, const char* skysql_message);
apr_status_t skysql_send_eof(conn_rec *c, apr_pool_t *p, uint8_t packet_number);
apr_status_t skysql_send_result(conn_rec *c, uint8_t *data, uint8_t len);
int select_random_slave_server(int nslaves);
int select_random_slave_server(const char *server_listi, int *num_slaves);
apr_status_t gateway_send_error (conn_rec *c, apr_pool_t *p, uint8_t packet_number);
apr_status_t gateway_reply_data(conn_rec *c, apr_pool_t *pool, void *data, int len);
char *gateway_find_user_password_sha1(char *username, void *repository, conn_rec *c, apr_pool_t *p);
void skysql_sha1_str(const uint8_t *in, int in_len, uint8_t *out);
int skygateway_query_result(conn_rec *c, apr_pool_t *p, MYSQL_conn *conn, const char *query);
//apr_status_t skysql_change_user(conn_rec *c, apr_pool_t *p, char *username, char *database, MYSQL *conn, uint8_t *stage1_hash);
char *bin2hex(char *out, const uint8_t *in, unsigned int len);
void skysql_sha1_2_str(const uint8_t *in, int in_len, const uint8_t *in2, int in2_len, uint8_t *out);
void skysql_str_xor(char *output, const uint8_t *input1, const uint8_t *input2, unsigned int len);
int get_server_from_list(char **selected_host, int *selected_port, char *server_list, int num, apr_pool_t *p);
int get_master_from_list(char **selected_host, int *selected_port, char *server_list, apr_pool_t *p);
int mysql_pass_packet(MYSQL_conn *conn, const char *command, int len);
int mysql_receive_packet(conn_rec *c, apr_pool_t *p, MYSQL_conn *conn);
int skygateway_statement_prepare_result(conn_rec *c, apr_pool_t *p, MYSQL_conn *conn, const char *query, int len);
int skygateway_statement_execute_result(conn_rec *c, apr_pool_t *p, MYSQL_conn *conn, const char *query, int len);
int mysql_send_command(MYSQL_conn *conn, const char *command, int cmd, int len);
apr_status_t skysql_change_user(conn_rec *c, apr_pool_t *p, char *username, char *database, MYSQL_conn *conn, uint8_t *stage1_hash);
int query_routing(const char *server_list, const char *sql_command, int procotol_command, int current_slave);
unsigned int mysql_errno(MYSQL_conn *mysql);
const char *mysql_error(MYSQL_conn *mysql);
const char *mysql_sqlstate(MYSQL_conn *mysql);
int mysql_query(MYSQL_conn *conn, const char *query);

View File

@ -1,6 +1,6 @@
////////////////////////////////////////
// SKYSQL Utils
// By Massimiliano Pinto 2012
// By Massimiliano Pinto 2012/2013
// SkySQL AB
////////////////////////////////////////
@ -143,10 +143,10 @@ int skysql_check_scramble(conn_rec *c, apr_pool_t *p, uint8_t *token, unsigned i
uint8_t check_hash[APR_SHA1_DIGESTSIZE];
char hex_double_sha1[2 * APR_SHA1_DIGESTSIZE + 1]="";
uint8_t *password = gateway_find_user_password_sha1("pippo", NULL, c, p);
uint8_t *password = gateway_find_user_password_sha1(username, NULL, c, p);
bin2hex(hex_double_sha1, password, APR_SHA1_DIGESTSIZE);
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "stored hex(SHA1(SHA1(password))) [%s]", hex_double_sha1);
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "The Gateway stored hex(SHA1(SHA1(password))) for \"%s\" [%s]", username, hex_double_sha1);
// possible, now skipped
/*
@ -168,6 +168,12 @@ int skysql_check_scramble(conn_rec *c, apr_pool_t *p, uint8_t *token, unsigned i
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "SHA1 di SHA1(client password) [%s]", check_hash);
if (1) {
char inpass[100]="";
bin2hex(inpass, check_hash, 20);
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "The CLIENT hex(SHA1(SHA1(password))) for \"%s\" [%s]", username, inpass);
}
return memcmp(password, check_hash, APR_SHA1_DIGESTSIZE);
}
@ -846,6 +852,9 @@ int skygateway_query_result(conn_rec *c, apr_pool_t *p, MYSQL_conn *conn, const
apr_status_t poll_rv;
int is_eof = 0;
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "SKYSQLGW is sending query to backend [%lu] ...", conn->tid);
// send the query to the backend
query_ret = mysql_query(conn, query);
if (query_ret) {
@ -855,16 +864,14 @@ int skygateway_query_result(conn_rec *c, apr_pool_t *p, MYSQL_conn *conn, const
return 1;
}
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "SKYSQLGW is sending result set ...");
//poll_rv = apr_pollset_create(&pset, 1, p, 0);
poll_rv = apr_pollset_create(&pset, 1, p, 0);
pfd.p = p;
pfd.desc_type = APR_POLL_SOCKET;
pfd.reqevents = APR_POLLIN;
pfd.rtnevents = APR_POLLIN;
pfd.desc.s = conn->socket;
pfd.client_data = NULL;
//pfd.p = p;
//pfd.desc_type = APR_POLL_SOCKET;
//pfd.reqevents = APR_POLLIN;
//pfd.rtnevents = APR_POLLIN;
//pfd.desc.s = conn->socket;
//pfd.client_data = NULL;
//rv = apr_pollset_add(pset, &pfd);
@ -872,6 +879,9 @@ int skygateway_query_result(conn_rec *c, apr_pool_t *p, MYSQL_conn *conn, const
apr_socket_timeout_set(conn->socket, 100000000);
// read query resut from backend
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "SKYSQLGW is receiving query result from backend ...");
while(1) {
char errmesg_p[1000]="";
bytes=MAX_CHUNK;
@ -884,8 +894,6 @@ int skygateway_query_result(conn_rec *c, apr_pool_t *p, MYSQL_conn *conn, const
//fprintf(stderr, "wait Errore in recv, rv is %i, [%s]\n", rv, errmesg_p);
//fflush(stderr);
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "SKYSQLGW is receiving ...");
//apr_socket_atreadeof(conn->socket, &is_eof);
rv = apr_socket_recv(conn->socket, buffer, &bytes);
@ -920,13 +928,13 @@ int skygateway_query_result(conn_rec *c, apr_pool_t *p, MYSQL_conn *conn, const
apr_brigade_destroy(bb1);
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "SKYSQLGW receive, brigade sent with %li bytes", bytes);
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "SKYSQLGW receive, brigade sent to the client with %li bytes", bytes);
cycle++;
if (bytes < MAX_CHUNK) {
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "SKYSQLGW receive: less bytes than buffer here, Return from query result: total bytes %lu in %i", tot_bytes, cycle);
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "SKYSQLGW receive: less bytes than buffer here, Query Result: total bytes %lu in %i", tot_bytes, cycle);
return 0;
}
@ -1339,3 +1347,15 @@ int mysql_receive_packet(conn_rec *c, apr_pool_t *p, MYSQL_conn *conn) {
return 0;
}
backend_list select_backend_servers() {
backend_list l;
memset(&l, '\0', sizeof(backend_list));
l.num = 2;
l.list = "127.0.0.1:3307,127.0.0.1:3306,xxxx:11";
return l;
}