diff --git a/protocol_1.0/.deps b/protocol_1.0/.deps new file mode 100644 index 000000000..e69de29bb diff --git a/protocol_1.0/Makefile b/protocol_1.0/Makefile new file mode 100644 index 000000000..0d6078c15 --- /dev/null +++ b/protocol_1.0/Makefile @@ -0,0 +1,50 @@ +## +## Makefile -- Build procedure for sample skysql Apache module +## Autogenerated via ``apxs -n skysql -g''. +## + +builddir=. +top_srcdir=/packages/inst/apache_2.4.2 +top_builddir=/packages/inst/apache_2.4.2 +include /packages/inst/apache_2.4.2/build/special.mk + +# the used tools +APXS=apxs +APACHECTL=apachectl + +# additional defines, includes and libraries +#DEFS=-Dmy_define=my_value + +#INCLUDES= + +#LIBS=-lskysqlclient -lmysqlcompat + +#LDFLAGS= + +# the default target +all: local-shared-build + +# install the shared object file into Apache +install: install-modules-yes + +# cleanup +clean: + -rm -f mod_skysql.o mod_skysql.lo mod_skysql.slo mod_skysql.la skysql_utils.o skysql_utils.lo skysql_utils.slo skysql_utils.la + +# simple test +test: reload + lynx -mime_header http://localhost/skysql + +# install and activate shared object by reloading Apache to +# force a reload of the shared object file +reload: install restart + +# the general Apache start/restart/stop +# procedures +start: + $(APACHECTL) start +restart: + $(APACHECTL) restart +stop: + $(APACHECTL) stop + diff --git a/protocol_1.0/README b/protocol_1.0/README new file mode 100644 index 000000000..1b85f2722 --- /dev/null +++ b/protocol_1.0/README @@ -0,0 +1 @@ +Apache 2.4.2 diff --git a/protocol_1.0/mod_skysql.c b/protocol_1.0/mod_skysql.c new file mode 100644 index 000000000..3c7c71a88 --- /dev/null +++ b/protocol_1.0/mod_skysql.c @@ -0,0 +1,1222 @@ +////////////////////////////////// +// SKYSQL GATEWAY main module +// By Massimiliano Pinto 2012 +// SkySQL AB +////////////////////////////////// +// +////////////////////////////////// +// // +// S K Y S Q L G A T E W A Y // +// // +////////////////////////////////// + +#include "skysql_gw.h" + +unsigned int mysql_errno(MYSQL_conn *mysql) { + return 1146U; +} + + +const char *mysql_sqlstate(MYSQL_conn *mysql) { + return "00000"; + +} + +const char *mysql_error(MYSQL_conn *mysql) { + return "error 1111"; +} + +static char *strend(register const char *s) +{ + while (*s++); + return (char*) (s-1); +} + +int mysql_select_db(MYSQL_conn *conn, const char *db) { + apr_status_t rv; + //uint8_t *packet_buffer = NULL; + long bytes; + int ret = 1; + uint8_t packet_buffer[SMALL_CHUNK] = ""; + + // set COMM_INIT_DB + packet_buffer[4]= '\x02'; + strncpy(packet_buffer+4+1, db, SMALL_CHUNK - 1); + + //COMM_INIT_DB + DBNAME = paylod + skysql_set_byte3(packet_buffer, 1 + strlen(packet_buffer+4+1)); + + //packet header + payload = bytes to send + bytes = 4 + 1 + strlen(packet_buffer+4+1); + + // send to server + rv = apr_socket_send(conn->socket, packet_buffer, &bytes); + + if (rv != APR_SUCCESS) { + fprintf(stderr, "Errore in send query\n"); + fflush(stderr); + return 1; + } + + // now read the response from server + bytes = SMALL_CHUNK; + + memset(&packet_buffer, '\0', sizeof(packet_buffer)); + + rv = apr_socket_recv(conn->socket, packet_buffer, &bytes); + ret = packet_buffer[4]; + + if (rv != APR_SUCCESS) { + fprintf(stderr, "Errore in recv\n"); + fflush(stderr); + return 1; + } + + if (ret == '\x00') + return 0; + else + return ret; +} + +/////////////////////////////////////// +// MYSQL_conn structure setup +// A new standalone pool is allocated +/////////////////////////////////////// +static MYSQL_conn *mysql_init(MYSQL_conn *input) { + apr_pool_t *pool = NULL; + apr_status_t rv = -1; + + if (input == NULL) { + // structure alloction + input = calloc(1, sizeof(MYSQL_conn)); + if (input == NULL) + return NULL; + // new pool created + rv = apr_pool_create_core(&pool); + + if (rv != APR_SUCCESS) { + +#ifdef MYSQL_CONN_DEBUG + fprintf(stderr, "MYSQL_INIT: apr_pool_create_core FAILED\n"; + fflush(stderr); +#endif + free(input); + return NULL; + } + + // the structure now has the pool + input->pool = pool; + } + + return input; +} + +///////////////////////////////////// +// Send COM_QUIT to server +// Close socket +// free the pool +// free main pointer +///////////////////////////////////// +static void mysql_close(MYSQL_conn *conn) { + apr_status_t rv; + uint8_t packet_buffer[5]; + long bytes = 5; + + // Packet # is 0 + packet_buffer[3]= '\x00'; + + // COM_QUIT is \x01 + packet_buffer[4]= '\x01'; + + // set packet length to 1 + skysql_set_byte3(packet_buffer, 1); + + // send COM_QUIT + rv = apr_socket_send(conn->socket, packet_buffer, &bytes); + + // close socket + apr_socket_close(conn->socket); + + // pool destroy + apr_pool_destroy(conn->pool); + +#ifdef MYSQL_CONN_DEBUG + fprintf(stderr, "Open/Close Connection %lu to backend closed/cleaned\n", conn->tid); + fflush(stderr); +#endif + // free structure pointer + if (conn) + free(conn); +} + +int mysql_query(MYSQL_conn *conn, const char *query) { + apr_status_t rv; + //uint8_t *packet_buffer=NULL; + uint8_t packet_buffer[SMALL_CHUNK]; + long bytes; + int fd; + + //packet_buffer = (uint8_t *) calloc(1, 5 + strlen(query) + 1); + memset(&packet_buffer, '\0', sizeof(packet_buffer)); + + packet_buffer[4]= '\x03'; + strcpy(packet_buffer+5, query); + + skysql_set_byte3(packet_buffer, 1 + strlen(query)); + + bytes = 4 + 1 + strlen(query); + + fprintf(stderr, "THE QUERY is [%s] len %i\n", query, bytes); + fprintf(stderr, "THE QUERY TID is [%lu]", conn->tid); + fprintf(stderr, "THE QUERY scramble is [%s]", conn->scramble); + if (conn->socket == NULL) { + fprintf(stderr, "***** THE QUERY sock struct is NULL\n"); + } + fwrite(packet_buffer, bytes, 1, stderr); + fflush(stderr); + + apr_os_sock_get(&fd,conn->socket); + + fprintf(stderr, "QUERY Socket FD is %i\n", fd); + + fflush(stderr); + + rv = apr_socket_send(conn->socket, packet_buffer, &bytes); + + fprintf(stderr, "QUERY SENT [%s]\n", query); + fflush(stderr); + + if (rv != APR_SUCCESS) { + fprintf(stderr, "Errore in send query\n"); + fflush(stderr); + return 1; + } + + fprintf(stderr, "Query [%s] sent\n", query); + fflush(stderr); + + return 0; +} + + +int mysql_print_result(MYSQL_conn *conn) { + apr_status_t rv; + uint8_t buffer[MAX_CHUNK]; + long bytes; + + bytes = 1024 * 16; + + memset(buffer, '\0', sizeof(buffer)); + + rv = apr_socket_recv(conn->socket, buffer, &bytes); + + if (rv != APR_SUCCESS) { + fprintf(stderr, "Errore in recv\n"); + fflush(stderr); + return 1; + } + + //fprintf(stderr, "Result with %li columns\n", buffer[4]); + //fwrite(buffer, bytes, 1, stderr); + //fflush(stderr); + + return (int) buffer[4]; +} + +static int mysql_connect(char *host, int port, char *dbname, char *user, char *passwd, MYSQL_conn *conn) { + apr_status_t rv; + int connect = 0; + int ciclo = 0; + char buffer[SMALL_CHUNK]; + uint8_t packet_buffer[SMALL_CHUNK]; + char errmesg[128]; + uint8_t *payload = NULL; + int server_protocol; + char server_version[100]=""; + uint8_t *server_version_end = NULL; + uint16_t skysql_server_capabilities_one; + uint16_t skysql_server_capabilities_two; + int fd; + unsigned long tid =0; + apr_sockaddr_t *connessione; + apr_socket_t *socket = NULL; + long bytes; + uint8_t scramble_data_1[8 + 1] = ""; + uint8_t scramble_data_2[12 + 1] = ""; + uint8_t scramble_data[20 + 1] = ""; + uint8_t capab_ptr[4]; + int scramble_len; + uint8_t scramble[20 + 1]; + uint8_t client_scramble[20 + 1]; + uint8_t client_capabilities[4]; + uint32_t server_capabilities; + uint32_t final_capabilities; + char dbpass[500]=""; + apr_pool_t *pool = NULL; + + pool = conn->pool; + + apr_sockaddr_info_get(&connessione, host, APR_UNSPEC, port, 0, pool); + + if ((rv = apr_socket_create(&socket, connessione->family, SOCK_STREAM, APR_PROTO_TCP, pool)) != APR_SUCCESS) { + fprintf(stderr, "Errore creazione socket: [%s] %i\n", strerror(errno), errno); + exit; + } + + fprintf(stderr, "Socket initialized\n"); + fflush(stderr); + + conn->socket=socket; + + rv = apr_socket_opt_set(socket, APR_TCP_NODELAY, 1); + rv = apr_socket_opt_set(socket, APR_SO_NONBLOCK , 0); + + apr_socket_timeout_set(socket, 355000); + + if ((rv = apr_socket_connect(socket, connessione)) != APR_SUCCESS) { + apr_strerror(rv, errmesg, sizeof(errmesg)); + fprintf(stderr, "Errore connect %i, %s: RV = [%i], [%s]\n", errno, strerror(errno), rv, errmesg); + apr_socket_close(socket); + + return -1; + + } else { + connect = 1; + } + + fprintf(stderr, "CONNECT is DONE\n"); + + apr_os_sock_get(&fd,socket); + + fprintf(stderr, "Socket FD is %i\n", fd); + fflush(stderr); + + memset(&buffer, '\0', sizeof(buffer)); + + bytes = 16384; + + + rv = apr_socket_recv(socket, buffer, &bytes); + + if ( rv == APR_SUCCESS) { + fprintf(stderr, "RESPONSE ciclo %i HO letto [%s] bytes %li\n",ciclo, buffer, bytes); + fflush(stderr); + ciclo++; + } else { + if (APR_STATUS_IS_EOF(rv)) { + fprintf(stderr, "EOF reached. Bytes = %li\n", bytes); + } else { + apr_strerror(rv, errmesg, sizeof(errmesg)); + fprintf(stderr, "###### Receive error FINAL : connection not completed %i %s: RV = [%i], [%s]\n", errno, strerror(errno), rv, errmesg); + + apr_socket_close(socket); + + exit; + } + } + + fwrite(buffer, bytes, 1, stderr); + fflush(stderr); + + //decode mysql handshake + + payload = buffer + 4; + server_protocol= payload[0]; + fprintf(stderr, "Server Protocol [%i]\n", server_protocol); + payload++; + fprintf(stderr, "Protocol Version [%s]\n", payload); + fflush(stderr); + + server_version_end = strend((char*) payload); + payload = server_version_end + 1; + + // TID + tid = skysql_get_byte4(payload); + memcpy(&conn->tid, &tid, 4); + fprintf(stderr, "Thread ID is %lu\n", conn->tid); + fflush(stderr); + + payload +=4; + + // scramble_part 1 + memcpy(scramble_data_1, payload, 8); + payload += 8; + + // 1 filler + payload++; + + skysql_server_capabilities_one = skysql_get_byte2(payload); + fprintf(stderr, "Capab_1[\n"); + fwrite(&skysql_server_capabilities_one, 2, 1, stderr); + fflush(stderr); + + //2 capab_part 1 + 1 language + 2 server_status + payload +=5; + + skysql_server_capabilities_two = skysql_get_byte2(payload); + fprintf(stderr, "]Capab_2[\n"); + fwrite(&skysql_server_capabilities_two, 2, 1, stderr); + fprintf(stderr, "]\n"); + fflush(stderr); + + memcpy(&capab_ptr, &skysql_server_capabilities_one, 2); + fprintf(stderr, "Capab_1[\n"); + fwrite(capab_ptr, 2, 1, stderr); + fflush(stderr); + + memcpy(&(capab_ptr[2]), &skysql_server_capabilities_two, 2); + fprintf(stderr, "Capab_2[\n"); + fwrite(capab_ptr, 2, 1, stderr); + fflush(stderr); + + // 2 capab_part 2 + payload+=2; + + scramble_len = payload[0] -1; + + fprintf(stderr, "Scramble_len [%i]\n", scramble_len); + fflush(stderr); + + payload += 11; + + memcpy(scramble_data_2, payload, scramble_len - 8); + + fprintf(stderr, "Scramble_buff1["); + fwrite(scramble_data_1, 8, 1, stderr); + fprintf(stderr, "]\nScramble_buff2 ["); + fwrite(scramble_data_2, scramble_len - 8, 1, stderr); + fprintf(stderr, "]\n"); + + fflush(stderr); + + memcpy(scramble, scramble_data_1, 8); + memcpy(scramble + 8, scramble_data_2, scramble_len - 8); + + fprintf(stderr, "Full Scramble 20 bytes is [\n"); + fwrite(scramble, 20, 1, stderr); + fprintf(stderr, "\n]\n"); + fflush(stderr); + + memcpy(conn->scramble, scramble, 20); + fprintf(stderr, "Scramble from MYSQL_Conn is [\n"); + fwrite(scramble, 20, 1, stderr); + fprintf(stderr, "\n]\n"); + + fflush(stderr); + + fprintf(stderr, "Now sending user, pass & db\n["); + + fwrite(&server_capabilities, 4, 1, stderr); + fprintf(stderr, "]\n"); + + final_capabilities = skysql_get_byte4((uint8_t *)&server_capabilities); + fprintf(stderr, "CAPABS [%u]\n", final_capabilities); + fflush(stderr); + + memset(packet_buffer, '\0', sizeof(packet_buffer)); + //packet_header(byte3 +1 pack#) + packet_buffer[3] = '\x01'; + + //final_capabilities = 1025669; + final_capabilities |= SKYSQL_CAPABILITIES_PROTOCOL_41; + final_capabilities &= SKYSQL_CAPABILITIES_CLIENT; + + if (passwd != NULL) { + uint8_t hash1[APR_SHA1_DIGESTSIZE]; + uint8_t hash2[APR_SHA1_DIGESTSIZE]; + uint8_t new_sha[APR_SHA1_DIGESTSIZE]; + + skysql_sha1_str(passwd, strlen(passwd), hash1); + fprintf(stderr, "Hash1 [%s]\n", hash1); + skysql_sha1_str(hash1, 20, hash2); + fprintf(stderr, "Hash2 [%s]\n", hash2); + + fprintf(stderr, "SHA1(SHA1(password in hex)\n"); + bin2hex(dbpass, hash2, 20); + fprintf(stderr, "PAss [%s]\n", dbpass); + fflush(stderr); + + skysql_sha1_2_str(scramble, 20, hash2, 20, new_sha); + fprintf(stderr, "newsha [%s]\n", new_sha); + + skysql_str_xor(client_scramble, new_sha, hash1, 20); + + fprintf(stderr, "Client send scramble 20 [\n"); + fwrite(client_scramble, 20, 1, stderr); + fprintf(stderr, "\n]\n"); + fflush(stderr); + + } + + if (dbname == NULL) { + // now without db!! + final_capabilities &= ~SKYSQL_CAPABILITIES_CONNECT_WITH_DB; + } else { + final_capabilities |= SKYSQL_CAPABILITIES_CONNECT_WITH_DB; + } + + + skysql_set_byte4(client_capabilities, final_capabilities); + memcpy(packet_buffer + 4, client_capabilities, 4); + + packet_buffer[4] = '\x8d'; + packet_buffer[5] = '\xa6'; + packet_buffer[6] = '\x0f'; + packet_buffer[7] = '\x00'; + + skysql_set_byte4(packet_buffer + 4 + 4, 16777216); + packet_buffer[12] = '\x08'; + + fprintf(stderr, "User is [%s]\n", user); + fflush(stderr); + + strcpy(packet_buffer+36, user); + + fprintf(stderr, "HERE\n"); + fflush(stderr); + + bytes = 32 + 22 + 1 + 1; + + bytes += strlen(user); + + if (dbname == NULL) { + strcpy(packet_buffer+36 + 5 + 2, "mysql_native_password"); + } else { + if (passwd != NULL) { + *(packet_buffer+36 + 5 + 1) = 20; + memcpy(packet_buffer+36 + 5 + 1 + 1, client_scramble, 20); + strcpy(packet_buffer+36 + 5 + 1 + 1 + 20, dbname); + strcpy(packet_buffer+36 + 5 + 1 + 1 + 20 + strlen(dbname) + 1, "mysql_native_password"); + bytes += 20 + strlen(dbname) + 1; + } else { + strcpy(packet_buffer+36 + 5 + 1 + 1, dbname); + strcpy(packet_buffer+36 + 5 + 1 + 1 + strlen(dbname) + 1, "mysql_native_password"); + bytes += strlen(dbname) + 1; + } + } + + skysql_set_byte3(packet_buffer, bytes); + + bytes += 4; + + rv = apr_socket_send(socket, packet_buffer, &bytes); + + if (rv != APR_SUCCESS) { + fprintf(stderr, "Errore in send\n"); + } + + bytes = 4096; + + memset(buffer, '\0', sizeof (buffer)); + + rv = apr_socket_recv(socket, buffer, &bytes); + + + if (rv != APR_SUCCESS) { + fprintf(stderr, "Errore in recv\n"); + } + + fprintf(stderr, "ok packet\["); + fwrite(buffer, bytes, 1, stderr); + fprintf(stderr, "]\n"); + fflush(stderr); + + if (buffer[4] == '\x00') { + fprintf(stderr, "OK packet received, packet # %i\n", buffer[3]); + fflush(stderr); + return 0; + } + + return 1; +} + +/////////////////////////////////////// +// interaction with apache scoreboard +//message 64 bytes max +/////////////////////////////////////// + +static int update_gateway_child_status(ap_sb_handle_t *sbh, int status, conn_rec *c, apr_bucket_brigade *bb, char *message) { + worker_score *ws = ap_get_scoreboard_worker(sbh); + int old_status = ws->status; + + ws->status = status; + + if (!ap_extended_status) { + return old_status; + } + + ws->last_used = apr_time_now(); + + /* initial pass only, please - in the name of efficiency */ + if (c) { + apr_cpystrn(ws->client, ap_get_remote_host(c, c->base_server->lookup_defaults, REMOTE_NOLOOKUP, NULL), sizeof(ws->client)); + apr_cpystrn(ws->vhost, c->base_server->server_hostname, sizeof(ws->vhost)); + /* Deliberate trailing space - filling in string on WRITE passes */ + apr_cpystrn(ws->request, message, sizeof(ws->request)); + } + + return old_status; +} + +/////////////////////////////////////////////////// +// custom mysqlclose for apache styart new child // +/////////////////////////////////////////////////// + +void child_mysql_close(MYSQL_conn *conn) { + apr_status_t rv; + uint8_t packet_buffer[5]; + long bytes = 5; + + fprintf(stderr, "SkySQL Gateway process ID %lu is exiting\n", getpid()); + fflush(stderr); + if (conn) + mysql_close(conn); +} + +/////////////////////////////////////////////// +// custom mysqsl_close in process_connection // +/////////////////////////////////////////////// + +void my_mysql_close(MYSQL_conn *conn, conn_rec *c) { + int fd=-1; + + apr_os_sock_get(&fd,conn->socket); + + if (fd) { + if (c !=NULL) + ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "Connection TID %lu to backend server closed", conn->tid); + } else { + if (c !=NULL) + ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "init resources free"); + } + + mysql_close(conn); +} + +/////////////////////////////////////////////////////////// +// the mysql protocol implementation at connection level // +/////////////////////////////////////////////////////////// + +static int skysql_process_connection(conn_rec *c) { + apr_bucket_brigade *r_bb; + apr_bucket_brigade *bb; + apr_bucket *b; + apr_bucket *auth_bucket; + apr_bucket *bucket; + apr_status_t rv; + int seen_eos = 0; + int child_stopped_reading = 0; + + //MYSQL_RES *result; + //MYSQL_ROW row; + //MYSQL_STMT *statement = NULL; + int num_fields; + int i; + uint8_t header_result_packet[4]; + uint8_t result_column_count; + int result_version_len = 0; + char *query_from_client = NULL; + int query_from_client_len = 0; + char *client_auth_packet = NULL; + unsigned int query_ret = 0; + int return_data = 0; + int input_read = 0; + unsigned int skysql_errno = 0; + const char *skysql_error_msg = NULL; + const char *skysql_state = NULL; + uint8_t *outbuf = NULL; + uint8_t client_flags[4]; + + int load_balancing_servers = 0; + int current_slave = -1; + + skysql_server_conf *conf; + char *current_slave_server_host = NULL; + int current_slave_server_port = 3306; + skysql_client_auth *mysql_client_data = NULL; + mysql_driver_details *mysql_driver = NULL; + MYSQL_conn *conn = NULL; + apr_pool_t *pool = NULL; + int max_queries_per_connection = 0; + uint8_t mysql_command = 0; + char tmp_buffer[10001]=""; + unsigned long tmp_buffer_len = 0L; + + uint8_t scramble[20]=""; + int scramble_len = 0; + + uint8_t stage1_hash[20 +1] =""; + + conn_details *find_server = NULL; + char *selected_host = NULL; + char *selected_dbname = NULL; + int selected_shard = 0; + int selected_port = 0; + + apr_interval_time_t timeout = 300000000; + + ///////////////////////////////////////// + // basic infos from configuration file + ///////////////////////////////////////// + conf = (skysql_server_conf *)ap_get_module_config(c->base_server->module_config, &skysql_module); + + /////////////////////////////////////////// + // MYSQL Protocol switch in configuration + /////////////////////////////////////////// + if (!conf->protocol_enabled) { + return DECLINED; + } + + /////////////////////////////////////////////// + // now setting the timeout form configuration + /////////////////////////////////////////////// + + if (conf->loop_timeout > 0) { + timeout = conf->loop_timeout * 1000000; + } + + //////////////////////////////////// + // apache scoreboard update + // aka, customizing server-status!! + ///////////////////////////////////// + + ap_time_process_request(c->sbh, START_PREQUEST); + update_gateway_child_status(c->sbh, SERVER_READY, c, NULL, "GATEWAY: MYSQL ready "); + + ////////////////////////// + // now the c->pool is ok + ////////////////////////// + pool = c->pool; + + /////////////////////////////// + // mysql server/client detail + /////////////////////////////// + mysql_client_data = apr_pcalloc(pool, sizeof(skysql_client_auth)); + mysql_driver = apr_pcalloc(pool, sizeof(mysql_driver_details)); + mysql_client_data->driver_details = (mysql_driver_details *) mysql_driver; + + // yeah, one connection + ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "I got a connection!, id [%i]", c->id); + + //////////////////////////////////////////////////////////////////////////////// + // default scenario is to perform here protocol handshake and the autentication + //////////////////////////////////////////////////////////////////////////////// + + ////////////////////////////////////// + // MYSQL 5.1/5.5 Compatible handshake + // todo: a return structure with connection data: capabilities, scramble_buff + + update_gateway_child_status(c->sbh, SERVER_BUSY_WRITE, c, NULL, "GATEWAY: MYSQL handshake sent "); + + rv = skysql_send_handshake(c, scramble, &scramble_len); + + update_gateway_child_status(c->sbh, SERVER_BUSY_READ, c, NULL, "GATEWAY: MYSQL Auth read "); + + /////////////////////////////////////// + // now read the client authentication + // and return data structure with client details, dbname, username, and the stage1_hash + // the latest is for further backend authentication with same user/pass + + rv = skysql_read_client_autentication(c, pool, scramble, scramble_len, mysql_client_data, stage1_hash); + + // client authentication data stored + + if (!rv) { + // todo implement custom error packet + // message and return status + skysql_send_ok(c, pool, 2, 0, NULL); + ap_log_error(APLOG_MARK, APLOG_ERR, 0, c->base_server, "*** MySQL Authentication FALSE, thread ID is %i", getpid()); + + return HTTP_INTERNAL_SERVER_ERROR; + } + + update_gateway_child_status(c->sbh, SERVER_BUSY_WRITE, c, NULL, "GATEWAY: MYSQL Auth Done "); + + /////////////////////////////// + // ok, client is autenticated + // akwnoledge it! + /////////////////////////////// + skysql_send_ok(c, pool, 2, 0, NULL); + + ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "MySQL Authentication OK, thread ID is %i", getpid()); + + ////////////////////////////// + // check if db is in connect + ////////////////////////////// + if (mysql_driver->connect_with_db) { + ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "DB is in connect packet"); + } + + ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "current username is [%s]", mysql_client_data->username); + ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "current DB is [%s]", mysql_client_data->database != NULL ? mysql_client_data->database : ""); + + // now the pool pointer is set to NULL + pool = NULL; + + ////////////////////////// + // check pooling config + ///////////////////////// + + if (!conf->pool_enabled) { + ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "MySQL backend open/close"); + conn = mysql_init(NULL); + //memset(&conn2, '\0', sizeof(MYSQL_conn)); + //conn = &conn2; + if (conn == NULL) { + ap_log_error(APLOG_MARK, APLOG_ERR, 0, c->base_server, "MYSQL init Error %u: %s", 1, "No memory"); + return 500; + } + + // do the connect + // find config data + find_server = apr_hash_get(conf->resources, "loadbal", APR_HASH_KEY_STRING); + if (find_server != NULL) { + ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "SKYSQL config find was DONE"); + + //switch(find_server->type) + if (find_server->nshards == 1) { + selected_port = atoi(strchr(find_server->server_list, ':') + 1); + selected_host = apr_pstrndup(c->pool, find_server->server_list, strchr(find_server->server_list, ':') - find_server->server_list); + selected_shard = 1; + } else { + selected_shard = select_random_slave_server(find_server->nshards); + ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "SKYSQL config find [%i] servers", find_server->nshards); + get_server_from_list(&selected_host, &selected_port, find_server->server_list, selected_shard, c->pool); + } + } else { + ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "SKYSQL config find KO: using default!"); + selected_port = 3306; + selected_host = apr_pstrdup(c->pool, "127.0.0.1"); + } + + ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "SKYSQL backend selection [%i], [%s]:[%i]", selected_shard, selected_host, selected_port); + + if (mysql_client_data->database != NULL) { + selected_dbname = mysql_client_data->database; + } else { + selected_dbname = "test"; + } + + if (mysql_connect(selected_host, selected_port, selected_dbname, mysql_client_data->username, "pippo", conn) != 0) { + //if (mysql_real_connect(conn, "192.168.1.40", "root", "pippo", "test", 3306, NULL, 0) == NULL) { + ap_log_error(APLOG_MARK, APLOG_ERR, 0, c->base_server, "MYSQL Connect [%s:%i] Error %u: %s", selected_host, selected_port, mysql_errno(conn), mysql_error(conn)); + return 500; + } else { + ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "SkySQL RunTime Opened connection to backend"); + ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "Backend Server TID %i, scamble_buf [%5s]", conn->tid, conn->scramble); + } + + } else { + // use the pool + ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "MySQL backend pool"); + conn = conf->conn; + } + + update_gateway_child_status(c->sbh, SERVER_BUSY_READ, c, NULL, apr_psprintf(c->pool, "GATEWAY: MYSQL backend selected, DB [%s] ", selected_dbname)); + + ////////////////////////////////////////////////////// + // main loop + // speaking MySQL protocol 5.1/5.5 + ////////////////////////////////////////////////////// + + //////////////////////////////////////////////////// + // here applying the timeout to the current socket + // this protects/saves the main loop + // so ... choose the right value + + apr_socket_timeout_set(ap_get_conn_socket(c), timeout); + + while(1) { + ////////////////////////////////////////////////////////////// + // the new pool is allocated on c->pool + // this new pool is the right one for the while(1) main loop + // it MUST BE destroyed just before exiting the loop, or on + // a break statement + // take care of it + ////////////////////////////////////////////////////////////// + + apr_pool_create(&pool, c->pool); + + r_bb = apr_brigade_create(pool, c->bucket_alloc); + + ///////////////////////// + // reading client input + ///////////////////////// + + child_stopped_reading = 0; + input_read = 0; + + update_gateway_child_status(c->sbh, SERVER_BUSY_KEEPALIVE, c, NULL, apr_psprintf(pool, "GATEWAY: MYSQL loop, DB [%s]", selected_dbname)); + + /////////////////////////////////////////////// + // Get input bytes from the client, blocking + // TODO: handle multi packet input + // or reading larger data input + // yes, this is only one brigade! + /////////////////////////////////////////////// + + if (((rv = ap_get_brigade(c->input_filters, r_bb, AP_MODE_READBYTES, APR_BLOCK_READ, 8192)) != APR_SUCCESS) || APR_BRIGADE_EMPTY(r_bb)) { + char errmsg[256]=""; + // is this an error? + //apr_brigade_cleanup(r_bb); + //apr_brigade_destroy(r_bb); + apr_strerror(rv, errmsg, sizeof(errmsg)); + ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, ">>> No more data from client, in ap_get_brigade [%s]", errmsg); + + //apr_pool_destroy(pool); + // this breaks the main loop + //break; + } + + ///////////////////////////////////////////// + // now extract data bucket from the brigade + ///////////////////////////////////////////// + + for (bucket = APR_BRIGADE_FIRST(r_bb); bucket != APR_BRIGADE_SENTINEL(r_bb); bucket = APR_BUCKET_NEXT(bucket)) { + apr_size_t len = 0; + const char *data = NULL; + if (APR_BUCKET_IS_EOS(bucket)) { + seen_eos = 1; + break; + } + + if (APR_BUCKET_IS_FLUSH(bucket)) { + continue; + } + + if (child_stopped_reading) { + // the statement breaks this 'for' loop NOT the main loop with 'while'! + break; + } + + /////////////////////////// + // reading a bucket + // what to do with large input data, as 'mysql load data'???? + /////////////////////////// + rv = apr_bucket_read(bucket, &data, &len, APR_BLOCK_READ); + + ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "Input data with len [%i]", len); + + if (rv != APR_SUCCESS) { + char errmsg[256]=""; + apr_strerror(rv, errmsg, sizeof(errmsg)); + child_stopped_reading = 1; + ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "Child stopped reading [%s]", errmsg); + } + //////////////////////////////////////////////////////// + // current data is copied into a pool allocated buffer + //////////////////////////////////////////////////////// + query_from_client = (char *)apr_pstrmemdup(pool, data, len); + query_from_client_len = len; + + input_read = 1; + } + + // let's destroy the brigate, it's useless now + apr_brigade_destroy(r_bb); + + // now handle client input + if (input_read == 1 && query_from_client != NULL) { + ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "Client Input [%s], command [%x]", query_from_client+5, query_from_client[4]); + } else { + // no data read + // input buffer NULL or empty + // what to do? + ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "SkySQL Gateway main loop: input is empty, exiting"); + apr_pool_destroy(pool); + break; + } + + //prepare custom error response if max is raised + max_queries_per_connection++; + if (max_queries_per_connection > 1000002) { + ap_log_error(APLOG_MARK, APLOG_ERR, 0, c->base_server, "max_queries_per_connection reached = %li", max_queries_per_connection); + gateway_send_error(c, pool, 1); + apr_pool_destroy(pool); + + // if (die_on__max_queries_per_connection) + //break; + continue; + } + + // check the mysql thread id, for pre-openend the ti is in conf->tid + ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "Serving Client with MySQL Thread ID [%lu]", conn->tid); + + mysql_command = query_from_client[4]; + + ///////////////////////////////////// + // now processing the mysql_command + ///////////////////////////////////// + + ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "Client Input command [%x]", mysql_command); + + update_gateway_child_status(c->sbh, SERVER_BUSY_KEEPALIVE, c, NULL, apr_psprintf(pool, "GATEWAY: MYSQL loop Command [%x], DB [%s]", mysql_command, selected_dbname)); + + switch (mysql_command) { + case 0x0e : + ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "COM_PING"); + // reponse sent directly to the client + // no ping to backend, for now + skysql_send_ok(c, pool, 1, 0, NULL); + break; + case 0x03 : + ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "COM_QUERY"); + skygateway_query_result(c, pool, conn, query_from_client+5); + //skysql_send_ok(c, pool, 1, 0, NULL); + + break; + case 0x02 : + ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "COM_INIT_DB"); + //mysql_select_db(conn, query_from_client+5); + ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "COM_INIT_DB", query_from_client+5); + // reponse sent to the client + skysql_send_ok(c, pool, 1, 0, NULL); + break; + case 0x01 : + ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "COM_QUIT"); + // QUIT received + // close backend connection if not pooled + // and exit the switch + + if (!conf->pool_enabled) { + mysql_close(conn); + } + break; + dafault : + ap_log_error(APLOG_MARK, APLOG_ERR, 0, c->base_server, "UNKNOW MYSQL PROTOCOL COMMAND [%x]", mysql_command); + // reponse sent to the client, with custom error: TODO + skysql_send_ok(c, pool, 1, 0, "unknow command"); + break; + } + + ///////////////////////// + // now all is done: destroy immediately all resources in the new poll + // the loop continues with no resources allocated + apr_pool_destroy(pool); + + // if COM_QUIT terminate the main loop! + if (mysql_command == 0x01) { + ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "COM_QUIT has been received, the main loop now ends"); + break; + } else { + ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "the main loop continues"); + continue; + } + + //////////////////////////// + // main loop now ends + //////////////////////////// + + } + + ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "Main loop ended!"); + + // hey, it was okay to handle the protocol connectioni, is thereanything else to do? + + update_gateway_child_status(c->sbh, SERVER_CLOSING, c, NULL, "GATEWAY: MYSQL quit "); + + ap_time_process_request(c->sbh, STOP_PREQUEST); + + return OK; +} + +///////////////////////////////// +// The sample content handler +// Only with HTTP protocol +// so it's useless now +// will be useful with JSON +//////////////////////////////// +static int skysql_handler(request_rec *r) +{ + if (strcmp(r->handler, "skysql")) { + return DECLINED; + } + r->content_type = "text/html"; + + if (!r->header_only) + ap_rputs("The sample page from mod_skysql.c\n", r); + return OK; +} + +///////////////////////////////// +// Module Initialization +// Persistent structures & data +///////////////////////////////// + +static int skysql_init_module(apr_pool_t *p, apr_pool_t *plog, apr_pool_t *ptemp, server_rec *base_server) { + server_rec *s; + + s = base_server; +/* + do initialization here +*/ + ap_log_error(APLOG_MARK, APLOG_INFO, 0, s, "SKYSQL Init: Internal structure done"); + ap_log_error(APLOG_MARK, APLOG_INFO, 0, s, "SKYSQL Init: ext file ver is [%i]", skysql_ext_file_ver()); + + + return OK; +} + +//////////////////////////////////////// +// Child Initialization +// If enabled, per child connection(s) +//////////////////////////////////////// + +static void skysql_child_init(apr_pool_t *p, server_rec *s) { + // take care of virtualhosts ... + while(s) { + skysql_server_conf *conf; + conf = (skysql_server_conf *)ap_get_module_config(s->module_config, &skysql_module); + + if (conf->protocol_enabled && conf->pool_enabled) { + + // MySQL Init + conf->conn = mysql_init(NULL); + conf->conn->pool = p; + + // store child process id + conf->gateway_id = getpid(); + + if (conf->conn == NULL) { + ap_log_error(APLOG_MARK, APLOG_ERR, 0, s, "MYSQL init Error %u: %s\n", mysql_errno(conf->conn), mysql_error(conf->conn)); + return; + } + if (mysql_connect("127.0.0.1", 3306, "test", "root", "pippo", conf->conn) != 0) { + //if (mysql_real_connect(conf->conn, "192.168.1.40", "root", "pippo", "test", 3306, NULL, 0) == NULL) { + ap_log_error(APLOG_MARK, APLOG_ERR, 0, s, "MYSQL Connect Error %u: %s\n", mysql_errno(conf->conn), mysql_error(conf->conn)); + return ; + } else { + conf->mysql_tid = conf->conn->tid; + ap_log_error(APLOG_MARK, APLOG_ERR, 0, s, "PID %li SkySQL Child Init & Open connection TID %lu to backend", getpid(), conf->mysql_tid); + } + + } else { + ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, s, "Generic init flags %i, %i, Skip Protocol Setup & Skip database connection", conf->protocol_enabled, conf->pool_enabled); + } + + // next virtual host .. + s = s->next; + } +} + +//////////////////////////////////////// +// Creating defaulf configuration data +//////////////////////////////////////// +static void * create_skysql_config(apr_pool_t *p, server_rec *s) { + skysql_server_conf *ps = apr_pcalloc(p, sizeof(skysql_server_conf)); + ps->conn = NULL; + ps->protocol_enabled = 0; + ps->pool_enabled = 0; + ps->resources = apr_hash_make(p); + ps->loop_timeout = 90; + + return ps; +} + +///////////////////////////// +// Enabling MySQL Protocol // +///////////////////////////// +static const char *skysql_protocol_enable(cmd_parms *cmd, void *dummy, int arg) +{ + skysql_server_conf *sconf = ap_get_module_config(cmd->server->module_config, &skysql_module); + sconf->protocol_enabled = arg; + + return NULL; +} + +///////////////////////////////// +// Enabling MySQL loop timeout // +///////////////////////////////// +static const char *skysql_loop_timeout(cmd_parms *cmd, void *dummy, const char *arg) +{ + skysql_server_conf *sconf = ap_get_module_config(cmd->server->module_config, &skysql_module); + + sconf->loop_timeout = atoi(arg); + + return NULL; +} + +///////////////////////////////////////////// +// Enabling per child persistent connection +///////////////////////////////////////////// +static const char *skysql_pool_enable(cmd_parms *cmd, void *dummy, int arg) +{ + skysql_server_conf *sconf = ap_get_module_config(cmd->server->module_config, &skysql_module); + sconf->pool_enabled = arg; + + return NULL; +} + +static const char *skysql_single_db_resource(cmd_parms *cmd, void *dconf, const char *a1, const char *a2) { + char *ptr_port = NULL; + char *ptr_db = NULL; + char *ptr_host = NULL; + char *ptr_list = NULL; + + skysql_server_conf *conf = ap_get_module_config(cmd->server->module_config, &skysql_module); + + conn_details *newresource = apr_pcalloc(cmd->pool, sizeof(conn_details)); + + newresource->raw_config = apr_pstrdup(cmd->pool, a2); + newresource->name = apr_pstrdup(cmd->pool, a1); + + ptr_db = strchr(a2, ';'); + newresource->server_list = apr_pstrndup(cmd->pool, a2, ptr_db-a2); + newresource->dbname = apr_pstrdup(cmd->pool, ptr_db+1); + + newresource->nshards = 1; + + ptr_list = newresource->server_list; + ptr_host = ptr_list; + while((ptr_host = strchr(ptr_list, ',')) != NULL) { + newresource->nshards++; + ptr_list = ptr_host + 1; + } + // now put the struct in the hash table + apr_hash_set(conf->resources, apr_pstrdup(cmd->pool, a1), APR_HASH_KEY_STRING, newresource); + + // creare un contenitore, table??? da agganciare con la key a1 e value a2 + fprintf(stderr, "Config Resource %s with %i servers, [%s]\n", a1, newresource->nshards, newresource->server_list); + fflush(stderr); + + return NULL; +} + +////////////////////////////// +// commands implemeted here // +////////////////////////////// +static const command_rec skysql_cmds[] = { + AP_INIT_FLAG("SkySQLProtocol", skysql_protocol_enable, NULL, RSRC_CONF, "Run an MYSQL protocol on this host"), + AP_INIT_FLAG("SkySQLPool", skysql_pool_enable, NULL, RSRC_CONF, "SKYSQL backend servers pool"), + AP_INIT_TAKE2("SkySQLSingleDBbresource", skysql_single_db_resource, NULL, OR_FILEINFO, "a single db resource name"), + AP_INIT_TAKE1("SkySQLTimeout", skysql_loop_timeout, NULL, OR_FILEINFO, "MYSQL protocol loop timeout"), + // SkySQLMaxQueryPerConnection + {NULL} +}; + +//////////////////////////// +// hooks implemented here // +//////////////////////////// +static void skysql_register_hooks(apr_pool_t *p) +{ + ap_hook_post_config(skysql_init_module, NULL,NULL, APR_HOOK_MIDDLE); + ap_hook_child_init(skysql_child_init, NULL, NULL, APR_HOOK_MIDDLE); + ap_hook_process_connection(skysql_process_connection, NULL, NULL, APR_HOOK_MIDDLE); + ap_hook_handler(skysql_handler, NULL, NULL, APR_HOOK_MIDDLE); +} + +///////////////////////////////// +// Dispatch list for API hooks // +///////////////////////////////// + +module AP_MODULE_DECLARE_DATA skysql_module = { + STANDARD20_MODULE_STUFF, + NULL, /* create per-dir config structures */ + NULL, /* merge per-dir config structures */ + create_skysql_config, /* create per-server config structures */ + NULL, /* merge per-server config structures */ + skysql_cmds, /* table of config file commands */ + skysql_register_hooks /* register hooks */ +}; + +/////////////////////////////////////////////// diff --git a/protocol_1.0/modules.mk b/protocol_1.0/modules.mk new file mode 100644 index 000000000..db810d1dd --- /dev/null +++ b/protocol_1.0/modules.mk @@ -0,0 +1,4 @@ +mod_skysql.la: mod_skysql.slo skysql_utils.slo skysql_backend.slo + $(SH_LINK) -rpath $(libexecdir) -module -avoid-version mod_skysql.lo skysql_utils.lo skysql_backend.lo +DISTCLEAN_TARGETS = modules.mk +shared = mod_skysql.la diff --git a/protocol_1.0/skysql_backend.c b/protocol_1.0/skysql_backend.c new file mode 100644 index 000000000..4d66e768a --- /dev/null +++ b/protocol_1.0/skysql_backend.c @@ -0,0 +1,64 @@ +//////////////////////////////////////// +// SKYSQL Backend +// By Massimiliano Pinto 2012 +// SkySQL AB +//////////////////////////////////////// + +#include "skysql_gw.h" + +#define SKYSQL_READ 0 +#define SKYSQL_WRITE 1 + +int skysql_query_is_select(const char *query) { + + return SKYSQL_READ; +} + +int skysql_ext_file_ver(void) { + int ret = 13; + return ret; +} + +int select_random_slave_server(int nslaves) { + int random_balancer = (int) ((nslaves) * (rand() / (RAND_MAX + 1.0))); + return random_balancer; +} + +int get_server_from_list(char **selected_host, int *selected_port, char *server_list, int num, apr_pool_t *p) { + int ret = -1; + int curr_srv = 0; + char *next = NULL; + char *tmp = NULL; + int port; + + if (num == 0) { + port = atoi(strchr(server_list, ':') + 1), sizeof(port); + memcpy(selected_port, &port, sizeof(int)); + *selected_host = apr_pstrndup(p, server_list, strchr(server_list, ':') - server_list); + + return 1; + } + + next = server_list; + + while (curr_srv < num) { + tmp = strchr(next, ','); + if (tmp != NULL) { + curr_srv++; + next = tmp+1; + } else { + return -1; + } + + if (curr_srv == num) { + port = atoi(strchr(next, ':') + 1); + memcpy(selected_port, &port, sizeof(port)); + *selected_host = apr_pstrndup(p, next, strchr(next, ':') - next); + ret = 0; + + break; + } + } + + return ret; +} diff --git a/protocol_1.0/skysql_client.h b/protocol_1.0/skysql_client.h new file mode 100644 index 000000000..bbf0f4ed2 --- /dev/null +++ b/protocol_1.0/skysql_client.h @@ -0,0 +1,134 @@ +//////////////////////////////////////// +// SKYSQL header file +// By Massimiliano Pinto 2012 +// SkySQL AB +//////////////////////////////////////// + +#include "ap_config.h" +#include "ap_mmn.h" +#include "httpd.h" +#include "http_core.h" +#include "http_main.h" +#include "http_config.h" +#include "http_connection.h" +#include "http_request.h" +#include "http_log.h" +#include "http_protocol.h" +#include "ap_config_auto.h" +#include "http_connection.h" + +#include "util_filter.h" +#include "util_script.h" +#include "apr.h" +#include "apr_general.h" +#include "apr_buckets.h" +#include "apr_optional.h" +#include "apr_strings.h" +#include "apr_tables.h" +#include "apr_lib.h" +#include "apr_fnmatch.h" +#include "apr_strings.h" +#include "apr_dbm.h" +#include "apr_rmm.h" +#include "apr_shm.h" +#include "apr_global_mutex.h" +#include "apr_time.h" +#include "scoreboard.h" + +// sha1 +#include "apr_sha1.h" + +// getpid +#include + + +/* Protocol packing macros. */ +#define skysql_set_byte2(__buffer, __int) do { \ + (__buffer)[0]= (uint8_t)((__int) & 0xFF); \ + (__buffer)[1]= (uint8_t)(((__int) >> 8) & 0xFF); } while (0) +#define skysql_set_byte3(__buffer, __int) do { \ + (__buffer)[0]= (uint8_t)((__int) & 0xFF); \ + (__buffer)[1]= (uint8_t)(((__int) >> 8) & 0xFF); \ + (__buffer)[2]= (uint8_t)(((__int) >> 16) & 0xFF); } while (0) +#define skysql_set_byte4(__buffer, __int) do { \ + (__buffer)[0]= (uint8_t)((__int) & 0xFF); \ + (__buffer)[1]= (uint8_t)(((__int) >> 8) & 0xFF); \ + (__buffer)[2]= (uint8_t)(((__int) >> 16) & 0xFF); \ + (__buffer)[3]= (uint8_t)(((__int) >> 24) & 0xFF); } while (0) + + +/* Protocol unpacking macros. */ +#define skysql_get_byte2(__buffer) \ + (uint16_t)((__buffer)[0] | \ + ((__buffer)[1] << 8)) +#define skysql_get_byte3(__buffer) \ + (uint32_t)((__buffer)[0] | \ + ((__buffer)[1] << 8) | \ + ((__buffer)[2] << 16)) +#define skysql_get_byte4(__buffer) \ + (uint32_t)((__buffer)[0] | \ + ((__buffer)[1] << 8) | \ + ((__buffer)[2] << 16) | \ + ((__buffer)[3] << 24)) +#define skysql_get_byte8(__buffer) \ + ((uint64_t)(__buffer)[0] | \ + ((uint64_t)(__buffer)[1] << 8) | \ + ((uint64_t)(__buffer)[2] << 16) | \ + ((uint64_t)(__buffer)[3] << 24) | \ + ((uint64_t)(__buffer)[4] << 32) | \ + ((uint64_t)(__buffer)[5] << 40) | \ + ((uint64_t)(__buffer)[6] << 48) | \ + ((uint64_t)(__buffer)[7] << 56)) + +typedef enum +{ + SKYSQL_CAPABILITIES_NONE= 0, + SKYSQL_CAPABILITIES_LONG_PASSWORD= (1 << 0), + SKYSQL_CAPABILITIES_FOUND_ROWS= (1 << 1), + SKYSQL_CAPABILITIES_LONG_FLAG= (1 << 2), + SKYSQL_CAPABILITIES_CONNECT_WITH_DB= (1 << 3), + SKYSQL_CAPABILITIES_NO_SCHEMA= (1 << 4), + SKYSQL_CAPABILITIES_COMPRESS= (1 << 5), + SKYSQL_CAPABILITIES_ODBC= (1 << 6), + SKYSQL_CAPABILITIES_LOCAL_FILES= (1 << 7), + SKYSQL_CAPABILITIES_IGNORE_SPACE= (1 << 8), + SKYSQL_CAPABILITIES_PROTOCOL_41= (1 << 9), + SKYSQL_CAPABILITIES_INTERACTIVE= (1 << 10), + SKYSQL_CAPABILITIES_SSL= (1 << 11), + SKYSQL_CAPABILITIES_IGNORE_SIGPIPE= (1 << 12), + SKYSQL_CAPABILITIES_TRANSACTIONS= (1 << 13), + SKYSQL_CAPABILITIES_RESERVED= (1 << 14), + SKYSQL_CAPABILITIES_SECURE_CONNECTION= (1 << 15), + SKYSQL_CAPABILITIES_MULTI_STATEMENTS= (1 << 16), + SKYSQL_CAPABILITIES_MULTI_RESULTS= (1 << 17), + SKYSQL_CAPABILITIES_PS_MULTI_RESULTS= (1 << 18), + SKYSQL_CAPABILITIES_PLUGIN_AUTH= (1 << 19), + SKYSQL_CAPABILITIES_SSL_VERIFY_SERVER_CERT= (1 << 30), + SKYSQL_CAPABILITIES_REMEMBER_OPTIONS= (1 << 31), + SKYSQL_CAPABILITIES_CLIENT= (SKYSQL_CAPABILITIES_LONG_PASSWORD | + SKYSQL_CAPABILITIES_FOUND_ROWS | + SKYSQL_CAPABILITIES_LONG_FLAG | + SKYSQL_CAPABILITIES_CONNECT_WITH_DB | + SKYSQL_CAPABILITIES_LOCAL_FILES | + SKYSQL_CAPABILITIES_PLUGIN_AUTH | + SKYSQL_CAPABILITIES_TRANSACTIONS | + SKYSQL_CAPABILITIES_PROTOCOL_41 | + SKYSQL_CAPABILITIES_MULTI_STATEMENTS | + SKYSQL_CAPABILITIES_MULTI_RESULTS | + SKYSQL_CAPABILITIES_PS_MULTI_RESULTS | + SKYSQL_CAPABILITIES_SECURE_CONNECTION) +} skysql_capabilities_t; + + +#define SMALL_CHUNK 1024 +#define MAX_CHUNK SMALL_CHUNK * 16 +#define ToHex(Y) (Y>='0'&&Y<='9'?Y-'0':Y-'A'+10) + +typedef struct { + apr_socket_t *socket; + char scramble[33]; + uint32_t server_capabs; + uint32_t client_capabs; + unsigned long tid; + apr_pool_t *pool; +} MYSQL_conn; diff --git a/protocol_1.0/skysql_gw.h b/protocol_1.0/skysql_gw.h new file mode 100644 index 000000000..fadca928e --- /dev/null +++ b/protocol_1.0/skysql_gw.h @@ -0,0 +1,113 @@ +//////////////////////////////////////// +// SKYSQL header file +// By Massimiliano Pinto 2012 +// SkySQL AB +//////////////////////////////////////// + +#include "ap_config.h" +#include "ap_mmn.h" +#include "httpd.h" +#include "http_core.h" +#include "http_main.h" +#include "http_config.h" +#include "http_connection.h" +#include "http_request.h" +#include "http_log.h" +#include "http_protocol.h" +#include "ap_config_auto.h" +#include "http_connection.h" + +#include "util_filter.h" +#include "util_script.h" +#include "apr.h" +#include "apr_general.h" +#include "apr_buckets.h" +#include "apr_optional.h" +#include "apr_strings.h" +#include "apr_tables.h" +#include "apr_lib.h" +#include "apr_fnmatch.h" +#include "apr_strings.h" +#include "apr_dbm.h" +#include "apr_rmm.h" +#include "apr_shm.h" +#include "apr_global_mutex.h" +#include "apr_time.h" +#include "scoreboard.h" + +// getpid +#include + +#include "skysql_client.h" + +#define SKYSQL_GATEWAY_VERSION "0.0.1" +#define SKYSQL_VERSION "5.5.22-SKY-1.6.5" + +#define HTTP_WELCOME_MESSAGE "HTTP/1.1 200 OK\r\nConnection: Keep-Alive\r\nContent-Type: text/plain\r\n\r\nSKYSQL Gateway " SKYSQL_GATEWAY_VERSION + +#define SKYSQL_LISTENER_VERSION "MySQL Community Server (GPL)" +#define SKYSQL_PROTOCOL_VERSION 10 // version is 10 +#define SKYSQL_THREAD_ID 11 +#define SKYSQL_HANDSKAKE_FILLER 0x00 +#define SKYSQL_SERVER_CAPABILITIES_BYTE1 0xff +#define SKYSQL_SERVER_CAPABILITIES_BYTE2 0xf7 +#define SKYSQL_SERVER_LANGUAGE 0x08 + +module AP_MODULE_DECLARE_DATA skysql_module; + +//const int SKY_SQL_MAX_PACKET_LEN = 0xffffffL; + +typedef struct { + MYSQL_conn *conn; + unsigned long mysql_tid; + unsigned long gateway_id; + int protocol_enabled; + int pool_enabled; + char backend_servers[2][128]; + apr_hash_t *resources; + int loop_timeout; +} skysql_server_conf; + +typedef struct +{ + char *name; + char *raw_config; + char *server_list; + int r_port; + char *dbname; + char *defaults; + int nshards; +} conn_details; + +typedef struct { + char *driver_name; + char *username; + char *password; + char *database; + void *driver_details; +} skysql_client_auth; + +typedef struct { + uint8_t client_flags[4]; + uint8_t max_packet_size[4]; + uint8_t charset; + uint8_t scramble_buff; + int connect_with_db; +} mysql_driver_details; + +int skysql_ext_file_ver(); +int skysql_query_is_select(const char *query); +apr_status_t skysql_read_client_autentication(conn_rec *c, apr_pool_t *pool, uint8_t *scramble, int scramble_len, skysql_client_auth *mysql_client_data, uint8_t *stage1_hash); +apr_status_t skysql_send_handshake(conn_rec *c, uint8_t *scramble, int *scramble_len); +apr_status_t skysql_send_error (conn_rec *c, uint8_t packet_number, MYSQL_conn *conn); +//apr_status_t skysql_prepare_ok(conn_rec *c, uint8_t packet_number, MYSQL_STMT *statement, MYSQL_conn *conn); +apr_status_t skysql_send_ok(conn_rec *c, apr_pool_t *p, uint8_t packet_number, uint8_t in_affected_rows, const char* skysql_message); +apr_status_t skysql_send_eof(conn_rec *c, uint8_t packet_number); +apr_status_t skysql_send_result(conn_rec *c, uint8_t *data, uint8_t len); +int select_random_slave_server(int nslaves); +apr_status_t gateway_send_error (conn_rec *c, apr_pool_t *p, uint8_t packet_number); +apr_status_t gateway_reply_data(conn_rec *c, apr_pool_t *pool, void *data, int len); +char *gateway_find_user_password_sha1(char *username, void *repository, conn_rec *c, apr_pool_t *p); +void skysql_sha1_str(const uint8_t *in, int in_len, uint8_t *out); +int skygateway_query_result(conn_rec *c, apr_pool_t *p, MYSQL_conn *conn, const char *query); +//apr_status_t skysql_change_user(conn_rec *c, apr_pool_t *p, char *username, char *database, MYSQL *conn, uint8_t *stage1_hash); diff --git a/protocol_1.0/skysql_utils.c b/protocol_1.0/skysql_utils.c new file mode 100644 index 000000000..458ab55fe --- /dev/null +++ b/protocol_1.0/skysql_utils.c @@ -0,0 +1,863 @@ +//////////////////////////////////////// +// SKYSQL Utils +// By Massimiliano Pinto 2012 +// SkySQL AB +//////////////////////////////////////// + +#include "skysql_gw.h" +#include "apr_sha1.h" + +#define MYSQL_PROTOCOL_VERSION41_CHAR '*' + +#define char_val(X) (X >= '0' && X <= '9' ? X-'0' :\ + X >= 'A' && X <= 'Z' ? X-'A'+10 :\ + X >= 'a' && X <= 'z' ? X-'a'+10 :\ + '\177') + +char hex_upper[] = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"; +char hex_lower[] = "0123456789abcdefghijklmnopqrstuvwxyz"; + +///////////////////////////////// +// binary data to hex string +// output must be pre allocated +///////////////////////////////// +char *bin2hex(char *out, const uint8_t *in, unsigned int len) { + const uint8_t *in_end= in + len; + if (len == 0 || in == NULL) { + return NULL; + } + + for (; in != in_end; ++in) { + *out++= hex_upper[((uint8_t) *in) >> 4]; + *out++= hex_upper[((uint8_t) *in) & 0x0F]; + } + *out= '\0'; + + return out; +} + +///////////////////////////////// +// hex string to binary data +// output must be pre allocated +///////////////////////////////// +int hex2bin(uint8_t *out, const char *in, unsigned int len) { + const char *in_end= in + len; + + if (len == 0 || in == NULL) { + return 1; + } + + while (in < in_end) { + register char tmp_ptr = char_val(*in++); + *out++= (tmp_ptr << 4) | char_val(*in++); + } + + return 0; +} + +///////////////////////////////// +// general random string +// output must be pre allocated +///////////////////////////////// +void skysql_set_random_str(uint8_t *output, unsigned int length) { + uint8_t *ptr = output; + apr_status_t rv = apr_generate_random_bytes(output, length); + + // this is for debug, the same scramble for every handshake + //strcpy(output, "12345678abcdefjhilmn"); +} + +///////////////////////////////////////////////////////////// +// fill a 20 bytes preallocated with SHA1 digest (160 bits) +// for one input on in_len bytes +///////////////////////////////////////////////////////////// +void skysql_sha1_str(const uint8_t *in, int in_len, uint8_t *out) { + int l; + apr_sha1_ctx_t context; + apr_byte_t digest[APR_SHA1_DIGESTSIZE]; + + apr_sha1_init(&context); + apr_sha1_update(&context, in, in_len); + apr_sha1_final(digest, &context); + + memcpy(out, digest, APR_SHA1_DIGESTSIZE); +} + +///////////////////////////////////////////////////////////// +// fill 20 bytes preallocated with SHA1 digest (160 bits) +// for two inputs, in_len and in2_len bytes +///////////////////////////////////////////////////////////// +void skysql_sha1_2_str(const uint8_t *in, int in_len, const uint8_t *in2, int in2_len, uint8_t *out) { + int l; + apr_sha1_ctx_t context; + apr_byte_t digest[APR_SHA1_DIGESTSIZE]; + + apr_sha1_init(&context); + apr_sha1_update(&context, in, in_len); + apr_sha1_update(&context, in2, in2_len); + apr_sha1_final(digest, &context); + + memcpy(out, digest, APR_SHA1_DIGESTSIZE); +} + +/////////////////////////////////////////////////////// +// fill a preallocated buffer with XOR(str1, str2) +// XOR between 2 equal len strings +// note that XOR(str1, XOR(str1 CONCAT str2)) == str2 +// and that XOR(str1, str2) == XOR(str2, str1) +/////////////////////////////////////////////////////// + +void skysql_str_xor(char *output, const uint8_t *input1, const uint8_t *input2, unsigned int len) { + const uint8_t *input1_end = NULL; + input1_end = input1 + len; + while (input1 < input1_end) + *output++= *input1++ ^ *input2++; + + *output = '\0'; +} + +////////////////////////////////////////// +// get skygateway password from username +// output is SHA1(SHA1(password)) +////////////////////////////////////////// + +char *gateway_find_user_password_sha1(char *username, void *repository, conn_rec *c, apr_pool_t *p) { + + uint8_t hash1[APR_SHA1_DIGESTSIZE]; + uint8_t hash2[APR_SHA1_DIGESTSIZE]; + + skysql_sha1_str(username, strlen(username), hash1); + skysql_sha1_str(hash1, APR_SHA1_DIGESTSIZE, hash2); + + return apr_pstrmemdup(p, hash2, APR_SHA1_DIGESTSIZE); +} + +///////////////////////////////////////////// +// get the SHA1(SHA1(password)) from client +///////////////////////////////////////////// + +int skysql_check_scramble(conn_rec *c, apr_pool_t *p, uint8_t *token, unsigned int token_len, uint8_t *scramble, unsigned int scramble_len, char *username, uint8_t *stage1_hash) { + uint8_t step1[APR_SHA1_DIGESTSIZE]; + uint8_t step2[APR_SHA1_DIGESTSIZE +1]; + uint8_t check_hash[APR_SHA1_DIGESTSIZE]; + char hex_double_sha1[2 * APR_SHA1_DIGESTSIZE + 1]=""; + + uint8_t *password = gateway_find_user_password_sha1("pippo", NULL, c, p); + + bin2hex(hex_double_sha1, password, APR_SHA1_DIGESTSIZE); + ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "stored hex(SHA1(SHA1(password))) [%s]", hex_double_sha1); + + // possible, now skipped + /* + if (password == NULL) { + ?????? + } + */ + + // step 1 + skysql_sha1_2_str(scramble, scramble_len, password, APR_SHA1_DIGESTSIZE, step1); + + //step2 + skysql_str_xor(step2, token, step1, token_len); + + ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "skygateway SHA1(password) [%s]", step2); + memcpy(stage1_hash, step2, 20); + + skysql_sha1_str(step2, APR_SHA1_DIGESTSIZE, check_hash); + + ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "SHA1 di SHA1(client password) [%s]", check_hash); + + return memcmp(password, check_hash, APR_SHA1_DIGESTSIZE); +} + +apr_status_t gateway_reply_data(conn_rec *c, apr_pool_t *pool, void *data, int len) { + apr_status_t rv = APR_SUCCESS; + apr_bucket_brigade *bb; + apr_bucket_brigade *r_bb; + + // create brigade + bb = apr_brigade_create(pool, c->bucket_alloc); + + apr_brigade_write(bb, ap_filter_flush, c->output_filters, data, len); + ap_fflush(c->output_filters, bb); + + apr_brigade_destroy(bb); + + return 1; +} + +apr_status_t skysql_change_user(conn_rec *c, apr_pool_t *p, char *username, char *database, MYSQL_conn *conn, uint8_t *stage1_hash) { + uint8_t skysql_payload_size = 0; + uint8_t skysql_packet_header[4]; + uint8_t skysql_packet_id = 0; + uint8_t change_user_command = 0x11; + uint8_t *outbuf = NULL; + uint8_t token[20 + 1]=""; + uint8_t charset[2]=""; + uint8_t backend_scramble[20 +1]=""; + + int fd = 0; + + int user_len = strlen(username); + int database_len = strlen(database); + uint8_t *password = NULL; + + uint8_t temp_token[20 +1] =""; + uint8_t stage1_password[20 +1] =""; + + //get password from repository + password = gateway_find_user_password_sha1("pippo", NULL, c, p); + memcpy(backend_scramble, conn->scramble, 20); + + skysql_sha1_2_str(backend_scramble, 20, password, 20, temp_token); + + *token = '\x14'; + + charset[0]='\x08'; + charset[1]='\x00'; + + skysql_str_xor(token+1, temp_token, stage1_hash, 20); + + ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "skygateway TO backend scramble [%s]", backend_scramble); + ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "skygateway SHA1(password) [%s]", stage1_hash); + ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "skygateway SHA1(scramble + SHA1(stage1_hash)) [%s]", temp_token); + ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "skygateway TO backend token [%s]", token+1); + + skysql_payload_size = 1 + user_len + 1 + sizeof(token) + database_len + 1 + sizeof(charset); + + // allocate memory for packet header + payload + outbuf = (uint8_t *) apr_pcalloc(p, sizeof(skysql_packet_header) + skysql_payload_size); + + // write packet header with packet number + skysql_set_byte3(skysql_packet_header, skysql_payload_size); + skysql_packet_header[3] = skysql_packet_id; + + memcpy(outbuf, skysql_packet_header, sizeof(skysql_packet_header)); + memcpy(outbuf + sizeof(skysql_packet_header), &change_user_command, 1); + memcpy(outbuf + sizeof(skysql_packet_header) + 1, username, user_len); + memcpy(outbuf + sizeof(skysql_packet_header) + 1 + strlen(username) + 1, token, 21); + memcpy(outbuf + sizeof(skysql_packet_header) + 1 + strlen(username) + 1 + 21, database, database_len); + memcpy(outbuf + sizeof(skysql_packet_header) + 1 + strlen(username) + 1 + 21 + database_len + 1, charset, sizeof(charset)); + + write(fd, outbuf, sizeof(skysql_packet_header) + skysql_payload_size); +} + +apr_status_t skysql_read_client_autentication(conn_rec *c, apr_pool_t *pool, uint8_t *scramble, int scramble_len, skysql_client_auth *mysql_client_data, uint8_t *stage1_hash) { + apr_bucket_brigade *r_bb; + apr_bucket *r_b; + apr_status_t rv; + int seen_eos = 0; + int child_stopped_reading = 0; + int i; + apr_bucket *auth_bucket; + apr_bucket *bucket; + const char *client_auth_packet = NULL; + unsigned int query_ret = 0; + int return_data = 0; + int input_read = 0; + uint8_t client_flags[4]; + char *current_slave_server_host = NULL; + int current_slave_server_port = 3306; + apr_pool_t *p = NULL; + mysql_driver_details *mysql_driver = NULL; + + uint8_t *token = NULL; + unsigned int token_len = 0; + int auth_ret = 0; + + // use the passed pool? + p = pool == NULL ? c->pool : pool; + + // now read the client authentication + r_bb = apr_brigade_create(p, c->bucket_alloc); + + if (((rv = ap_get_brigade(c->input_filters, r_bb, AP_MODE_READBYTES, APR_BLOCK_READ, 8192)) != APR_SUCCESS) || APR_BRIGADE_EMPTY(r_bb)) { + apr_brigade_destroy(r_bb); + return input_read; + } + + for (auth_bucket = APR_BRIGADE_FIRST(r_bb); bucket != APR_BRIGADE_SENTINEL(r_bb); bucket = APR_BUCKET_NEXT(auth_bucket)) { + apr_size_t len; + const char *data; + + if (APR_BUCKET_IS_EOS(auth_bucket)) { + seen_eos = 1; + break; + } + + if (APR_BUCKET_IS_FLUSH(auth_bucket)) { + continue; + } + + if (child_stopped_reading) { + break; + } + + rv = apr_bucket_read(auth_bucket, &data, &len, APR_BLOCK_READ); + + ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "Auth Data len [%i]", len); + + if (rv != APR_SUCCESS) { + child_stopped_reading = 1; + } + + client_auth_packet = apr_pstrmemdup(p, data, len); + input_read = 1; + } + + // this brigade is useless + apr_brigade_destroy(r_bb); + + if (input_read && client_auth_packet) { + // now fill data structure for client data in driver MYSQL5 + if (mysql_client_data != NULL) { + ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "Now decode MYSQL client auth packet"); + mysql_driver = (mysql_driver_details *)mysql_client_data->driver_details; + if (mysql_driver != NULL) { + uint8_t hash_stage1[20 +1]; +/* + uint8_t hash_stage1[20]; + uint8_t hash_stage2[20]; + uint8_t temp_token[20]; + uint8_t client_token[20]; + uint8_t check_auth[20]; + uint8_t final_hash[20]; +*/ + + // todo: insert constant values instead of numbers + memcpy(mysql_driver->client_flags, client_auth_packet + 4, 4); + mysql_driver->connect_with_db = SKYSQL_CAPABILITIES_CONNECT_WITH_DB & skysql_get_byte4(mysql_driver->client_flags); + mysql_client_data->username = apr_pstrndup(p, client_auth_packet + 4 + 4 + 4 + 1 + 23, 128); + memcpy(&token_len, client_auth_packet + 4 + 4 + 4 + 1 + 23 + strlen(mysql_client_data->username) + 1, 1); + + token = apr_pstrmemdup(p, client_auth_packet + 4 + 4 + 4 + 1 + 23 + strlen(mysql_client_data->username) + 1 + 1, token_len); + + ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "[client TO gateway] current username is [%s], token is [%s] len %i", mysql_client_data->username, token, token_len); + ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "[gateway TO client] server scramble was [%s], len %i", scramble, scramble_len); + +/* + + skysql_sha1_str(mysql_client_data->username, strlen(mysql_client_data->username), hash_stage1); + ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "SHA1 di '%s' [%s]", mysql_client_data->username, hash_stage1); + skysql_sha1_str(hash_stage1, 20, hash_stage2); + ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "SHA1 di SHA1('%s') [%s]", mysql_client_data->username, hash_stage2); + + + skysql_sha1_2_str(scramble, scramble_len, hash_stage2, 20, temp_token); + skysql_str_xor(check_auth, hash_stage1, temp_token, 20); + ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "This is the client input?? [%s]", check_auth); + + memset(temp_token, '\0', sizeof(temp_token)); + memcpy(client_token, scramble, scramble_len); + memcpy(client_token + scramble_len, hash_stage2, 20); + skysql_sha1_str(client_token, scramble_len, temp_token); + + ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "This is the client input?? [%s]", temp_token); + + + skysql_str_xor(check_auth, hash_stage2, hash_stage1, scramble_len); + ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "XOR( client token, stage2_hash) [%s]", check_auth); + skysql_sha1_str(check_auth, 20, final_hash); + ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "SHA1 di check_auth [%s]", final_hash); +*/ + // decode the token and check the password + auth_ret = skysql_check_scramble(c, p, token, token_len, scramble, scramble_len, mysql_client_data->username, stage1_hash); + + if (auth_ret == 0) { + ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "SkySQL Gateway Authentication OK for [%s]", mysql_client_data->username); + } else { + ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "**** SkySQL Gateway Authentication ERROR [%s], retcode = [%i]", mysql_client_data->username, auth_ret); + } + + if (mysql_driver->connect_with_db) { + mysql_client_data->database = apr_pstrndup(p, client_auth_packet + 4 + 4 + 4 + 1 + 23 + strlen(mysql_client_data->username) + 1 + 1 + token_len, 128); + } + + } + } + } + + return input_read; +} + +apr_status_t gateway_send_error (conn_rec *c, apr_pool_t *p, uint8_t packet_number) { + apr_status_t rv; + rv = APR_SUCCESS; + apr_bucket_brigade *bb; + apr_bucket *b; + uint8_t *outbuf = NULL; + uint8_t skysql_payload_size = 0; + uint8_t skysql_packet_header[4]; + uint8_t *skysql_payload = NULL; + uint8_t field_count = 0; + uint8_t affected_rows = 0; + uint8_t insert_id = 0; + uint8_t skysql_err[2]; + uint8_t skysql_statemsg[6]; + + unsigned int skysql_errno = 0; + const char *skysql_error_msg = NULL; + const char *skysql_state = NULL; + + skysql_errno = 6969; + skysql_error_msg = "Too many queries in one connection"; + skysql_state = "FA5D3"; + + field_count = 0xff; + skysql_set_byte2(skysql_err, skysql_errno); + skysql_statemsg[0]='#'; + memcpy(skysql_statemsg+1, skysql_state, 5); + + ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "SKYSQL_Error: Errno [%u], ErrorMessage [%s], State [%s]", skysql_errno, skysql_error_msg, skysql_state); + + skysql_payload_size = sizeof(field_count) + sizeof(skysql_err) + sizeof(skysql_statemsg) + strlen(skysql_error_msg); + + // allocate memory for packet header + payload + outbuf = (uint8_t *) apr_pcalloc(p, sizeof(skysql_packet_header) + skysql_payload_size); + + // write packet header with packet number + skysql_set_byte3(skysql_packet_header, skysql_payload_size); + skysql_packet_header[3] = packet_number; + + // write header + memcpy(outbuf, skysql_packet_header, sizeof(skysql_packet_header)); + + skysql_payload = outbuf + sizeof(skysql_packet_header); + + // write field + memcpy(skysql_payload, &field_count, sizeof(field_count)); + skysql_payload = skysql_payload + sizeof(field_count); + + // write errno + memcpy(skysql_payload, skysql_err, sizeof(skysql_err)); + skysql_payload = skysql_payload + sizeof(skysql_err); + + // write sqlstate + memcpy(skysql_payload, skysql_statemsg, sizeof(skysql_statemsg)); + skysql_payload = skysql_payload + sizeof(skysql_statemsg); + + // write err messg + memcpy(skysql_payload, skysql_error_msg, strlen(skysql_error_msg)); + + // create brigade + bb = apr_brigade_create(p, c->bucket_alloc); + b = apr_bucket_pool_create(outbuf, sizeof(skysql_packet_header) + skysql_payload_size, p, c->bucket_alloc); + APR_BRIGADE_INSERT_HEAD(bb, b); + b = apr_bucket_flush_create(c->bucket_alloc); + APR_BRIGADE_INSERT_TAIL(bb, b); + + return ap_pass_brigade(c->output_filters, bb); +} + +apr_status_t skysql_send_error (conn_rec *c, uint8_t packet_number, MYSQL_conn *conn) { + apr_status_t rv; + rv = APR_SUCCESS; + apr_bucket_brigade *bb; + apr_bucket *b; + uint8_t *outbuf = NULL; + uint8_t skysql_payload_size = 0; + uint8_t skysql_packet_header[4]; + uint8_t *skysql_payload = NULL; + uint8_t field_count = 0; + uint8_t affected_rows = 0; + uint8_t insert_id = 0; + uint8_t skysql_err[2]; + uint8_t skysql_statemsg[6]; + + unsigned int skysql_errno = 0; + const char *skysql_error_msg = NULL; + const char *skysql_state = NULL; + + skysql_errno = mysql_errno(conn); + skysql_error_msg = mysql_error(conn); + skysql_state = mysql_sqlstate(conn); + + field_count = 0xff; + skysql_set_byte2(skysql_err, skysql_errno); + skysql_statemsg[0]='#'; + memcpy(skysql_statemsg+1, skysql_state, 5); + + ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "SKYSQL_Error: Errno [%u], ErrorMessage [%s], State [%s]", skysql_errno, skysql_error_msg, skysql_state); + + skysql_payload_size = sizeof(field_count) + sizeof(skysql_err) + sizeof(skysql_statemsg) + strlen(skysql_error_msg); + + // allocate memory for packet header + payload + outbuf = (uint8_t *) apr_pcalloc(c->pool, sizeof(skysql_packet_header) + skysql_payload_size); + + // write packet header with packet number + skysql_set_byte3(skysql_packet_header, skysql_payload_size); + skysql_packet_header[3] = packet_number; + + // write header + memcpy(outbuf, skysql_packet_header, sizeof(skysql_packet_header)); + + skysql_payload = outbuf + sizeof(skysql_packet_header); + + // write field + memcpy(skysql_payload, &field_count, sizeof(field_count)); + skysql_payload = skysql_payload + sizeof(field_count); + + // write errno + memcpy(skysql_payload, skysql_err, sizeof(skysql_err)); + skysql_payload = skysql_payload + sizeof(skysql_err); + + // write sqlstate + memcpy(skysql_payload, skysql_statemsg, sizeof(skysql_statemsg)); + skysql_payload = skysql_payload + sizeof(skysql_statemsg); + + // write err messg + memcpy(skysql_payload, skysql_error_msg, strlen(skysql_error_msg)); + + // create brigade + bb = apr_brigade_create(c->pool, c->bucket_alloc); + b = apr_bucket_pool_create(outbuf, sizeof(skysql_packet_header) + skysql_payload_size, c->pool, c->bucket_alloc); + APR_BRIGADE_INSERT_HEAD(bb, b); + b = apr_bucket_flush_create(c->bucket_alloc); + APR_BRIGADE_INSERT_TAIL(bb, b); + + return ap_pass_brigade(c->output_filters, bb); +} + +apr_status_t skysql_send_result(conn_rec *c, uint8_t *data, uint8_t len) { + apr_status_t rv; + rv = APR_SUCCESS; + apr_bucket_brigade *bb; + apr_bucket *b; + + // create brigade + bb = apr_brigade_create(c->pool, c->bucket_alloc); + + // write + apr_brigade_write(bb, ap_filter_flush, c->output_filters, data, len); + + //send & flush + return ap_fflush(c->output_filters, bb); +} + +apr_status_t skysql_send_eof(conn_rec *c, uint8_t packet_number) { + apr_status_t rv; + rv = APR_SUCCESS; + apr_bucket_brigade *bb; + apr_bucket *b; + uint8_t *outbuf = NULL; + uint8_t skysql_payload_size = 0; + uint8_t skysql_packet_header[4]; + uint8_t *skysql_payload = NULL; + uint8_t field_count = 0; + uint8_t skysql_server_status[2]; + uint8_t skysql_warning_count[2]; + + field_count = 0xfe; + + skysql_payload_size = sizeof(field_count) + sizeof(skysql_server_status) + sizeof(skysql_warning_count); + + // allocate memory for packet header + payload + outbuf = (uint8_t *) apr_pcalloc(c->pool, sizeof(skysql_packet_header) + skysql_payload_size); + + // write packet header with packet number + skysql_set_byte3(skysql_packet_header, skysql_payload_size); + skysql_packet_header[3] = packet_number; + + // write header + memcpy(outbuf, skysql_packet_header, sizeof(skysql_packet_header)); + + skysql_payload = outbuf + sizeof(skysql_packet_header); + + skysql_server_status[0] = 2; + skysql_server_status[1] = 0; + skysql_warning_count[0] = 0; + skysql_warning_count[1] = 0; + + // write data + memcpy(skysql_payload, &field_count, sizeof(field_count)); + skysql_payload = skysql_payload + sizeof(field_count); + + memcpy(skysql_payload, skysql_server_status, sizeof(skysql_server_status)); + skysql_payload = skysql_payload + sizeof(skysql_server_status); + + memcpy(skysql_payload, skysql_warning_count, sizeof(skysql_warning_count)); + skysql_payload = skysql_payload + sizeof(skysql_warning_count); + + // create brigade + bb = apr_brigade_create(c->pool, c->bucket_alloc); + // write + apr_brigade_write(bb, ap_filter_flush, c->output_filters, outbuf, sizeof(skysql_packet_header) + skysql_payload_size); + //send & flush + return ap_fflush(c->output_filters, bb); +} + +apr_status_t skysql_send_ok(conn_rec *c, apr_pool_t *p, uint8_t packet_number, uint8_t in_affected_rows, const char* skysql_message) { + apr_status_t rv; + rv = APR_SUCCESS; + apr_bucket_brigade *bb; + apr_bucket *b; + uint8_t *outbuf = NULL; + uint8_t skysql_payload_size = 0; + uint8_t skysql_packet_header[4]; + uint8_t *skysql_payload = NULL; + uint8_t field_count = 0; + uint8_t affected_rows = 0; + uint8_t insert_id = 0; + uint8_t skysql_server_status[2]; + uint8_t skysql_warning_count[2]; + + affected_rows = in_affected_rows; + + skysql_payload_size = sizeof(field_count) + sizeof(affected_rows) + sizeof(insert_id) + sizeof(skysql_server_status) + sizeof(skysql_warning_count); + + if (skysql_message != NULL) { + skysql_payload_size += strlen(skysql_message); + } + + // allocate memory for packet header + payload + outbuf = (uint8_t *) apr_pcalloc(p, sizeof(skysql_packet_header) + skysql_payload_size); + + // write packet header with packet number + skysql_set_byte3(skysql_packet_header, skysql_payload_size); + skysql_packet_header[3] = packet_number; + + // write header + memcpy(outbuf, skysql_packet_header, sizeof(skysql_packet_header)); + + skysql_payload = outbuf + sizeof(skysql_packet_header); + + skysql_server_status[0] = 2; + skysql_server_status[1] = 0; + skysql_warning_count[0] = 0; + skysql_warning_count[1] = 0; + + // write data + memcpy(skysql_payload, &field_count, sizeof(field_count)); + skysql_payload = skysql_payload + sizeof(field_count); + + memcpy(skysql_payload, &affected_rows, sizeof(affected_rows)); + skysql_payload = skysql_payload + sizeof(affected_rows); + + memcpy(skysql_payload, &insert_id, sizeof(insert_id)); + skysql_payload = skysql_payload + sizeof(insert_id); + + memcpy(skysql_payload, skysql_server_status, sizeof(skysql_server_status)); + skysql_payload = skysql_payload + sizeof(skysql_server_status); + + memcpy(skysql_payload, skysql_warning_count, sizeof(skysql_warning_count)); + skysql_payload = skysql_payload + sizeof(skysql_warning_count); + + if (skysql_message != NULL) { + memcpy(skysql_payload, skysql_message, strlen(skysql_message)); + } + + + // create brigade + bb = apr_brigade_create(p, c->bucket_alloc); + +/* + b = apr_bucket_pool_create(outbuf, sizeof(skysql_packet_header) + skysql_payload_size, c->pool, c->bucket_alloc); + APR_BRIGADE_INSERT_HEAD(bb, b); + b = apr_bucket_flush_create(c->bucket_alloc); + APR_BRIGADE_INSERT_TAIL(bb, b); + + return ap_pass_brigade(c->output_filters, bb); +*/ + apr_brigade_write(bb, ap_filter_flush, c->output_filters, outbuf, sizeof(skysql_packet_header) + skysql_payload_size); + ap_fflush(c->output_filters, bb); + + apr_brigade_destroy(bb); + + return 1; +} + +/////////////////////////// +// scramble is 20 bytes and must be pre allocated +apr_status_t skysql_send_handshake(conn_rec *c, uint8_t *scramble, int *scramble_len) { + apr_status_t rv; + rv = APR_SUCCESS; + apr_bucket_brigade *bb; + apr_bucket *b; + apr_pool_t *p = c->pool; + + uint8_t *outbuf = NULL; + uint8_t skysql_payload_size = 0; + uint8_t skysql_packet_header[4]; + uint8_t skysql_packet_id = 0; + uint8_t skysql_filler = SKYSQL_HANDSKAKE_FILLER; + uint8_t skysql_protocol_version = SKYSQL_PROTOCOL_VERSION; + uint8_t *skysql_handshake_payload = NULL; + uint8_t skysql_thread_id[4]; + uint8_t skysql_scramble_buf[9] = ""; + uint8_t skysql_plugin_data[13] = ""; + uint8_t skysql_server_capabilities_one[2]; + uint8_t skysql_server_capabilities_two[2]; + uint8_t skysql_server_language = 8; + uint8_t skysql_server_status[2]; + uint8_t skysql_scramble_len = 21; + uint8_t skysql_filler_ten[10]; + uint8_t skysql_last_byte = 0x00; + + uint8_t scramble_buffer[20]=""; + + skysql_set_random_str(scramble_buffer, 20); + + // set len to the caller + memset(scramble_len, 20, 1); + + // copy back to the caller + memcpy(scramble, scramble_buffer, 20); + + memset(&skysql_filler_ten, 0x00, sizeof(skysql_filler_ten)); + + // thread id, now put the apache child PID, then a conversion map in memory is needed! + skysql_set_byte4(skysql_thread_id, getpid()); + + memcpy(skysql_scramble_buf, scramble_buffer, 8); + + memcpy(skysql_plugin_data, scramble_buffer + 8, 12); + + skysql_payload_size = sizeof(skysql_protocol_version) + (strlen(SKYSQL_VERSION) + 1) + sizeof(skysql_thread_id) + 8 + sizeof(skysql_filler) + sizeof(skysql_server_capabilities_one) + sizeof(skysql_server_language) + sizeof(skysql_server_status) + sizeof(skysql_server_capabilities_two) + sizeof(skysql_scramble_len) + sizeof(skysql_filler_ten) + 12 + sizeof(skysql_last_byte) + strlen("mysql_native_password") + sizeof(skysql_last_byte); + + // allocate memory for packet header + payload + outbuf = (uint8_t *) apr_pcalloc(p, sizeof(skysql_packet_header) + skysql_payload_size); + + // write packet heder with skysql_payload_size + skysql_set_byte3(skysql_packet_header, skysql_payload_size); + //skysql_packet_header[0] = skysql_payload_size; + + // write packent number, now is 0 + skysql_packet_header[3]= skysql_packet_id; + memcpy(outbuf, skysql_packet_header, sizeof(skysql_packet_header)); + + // current buffer pointer + skysql_handshake_payload = outbuf + sizeof(skysql_packet_header); + + // write protocol version + memcpy(skysql_handshake_payload, &skysql_protocol_version, sizeof(skysql_protocol_version)); + skysql_handshake_payload = skysql_handshake_payload + sizeof(skysql_protocol_version); + + // write server version plus 0 filler + strcpy(skysql_handshake_payload, SKYSQL_VERSION); + skysql_handshake_payload = skysql_handshake_payload + strlen(SKYSQL_VERSION); + *skysql_handshake_payload = 0x00; + skysql_handshake_payload++; + + // write thread id + memcpy(skysql_handshake_payload, skysql_thread_id, sizeof(skysql_thread_id)); + skysql_handshake_payload = skysql_handshake_payload + sizeof(skysql_thread_id); + + // write scramble buf + memcpy(skysql_handshake_payload, skysql_scramble_buf, 8); + skysql_handshake_payload = skysql_handshake_payload + 8; + *skysql_handshake_payload = SKYSQL_HANDSKAKE_FILLER; + skysql_handshake_payload++; + + // write server capabilities part one + skysql_server_capabilities_one[0] = SKYSQL_SERVER_CAPABILITIES_BYTE1; + skysql_server_capabilities_one[1] = SKYSQL_SERVER_CAPABILITIES_BYTE2; + + memcpy(skysql_handshake_payload, skysql_server_capabilities_one, sizeof(skysql_server_capabilities_one)); + skysql_handshake_payload = skysql_handshake_payload + sizeof(skysql_server_capabilities_one); + + // write server language + memcpy(skysql_handshake_payload, &skysql_server_language, sizeof(skysql_server_language)); + skysql_handshake_payload = skysql_handshake_payload + sizeof(skysql_server_language); + + //write server status + skysql_server_status[0] = 2; + skysql_server_status[1] = 0; + memcpy(skysql_handshake_payload, skysql_server_status, sizeof(skysql_server_status)); + skysql_handshake_payload = skysql_handshake_payload + sizeof(skysql_server_status); + + //write server capabilities part two + skysql_server_capabilities_two[0] = 15; + skysql_server_capabilities_two[1] = 128; + memcpy(skysql_handshake_payload, skysql_server_capabilities_two, sizeof(skysql_server_capabilities_two)); + skysql_handshake_payload = skysql_handshake_payload + sizeof(skysql_server_capabilities_two); + + // write scramble_len + memcpy(skysql_handshake_payload, &skysql_scramble_len, sizeof(skysql_scramble_len)); + skysql_handshake_payload = skysql_handshake_payload + sizeof(skysql_scramble_len); + + //write 10 filler + memcpy(skysql_handshake_payload, skysql_filler_ten, sizeof(skysql_filler_ten)); + skysql_handshake_payload = skysql_handshake_payload + sizeof(skysql_filler_ten); + + // write plugin data + memcpy(skysql_handshake_payload, skysql_plugin_data, 12); + skysql_handshake_payload = skysql_handshake_payload + 12; + + //write last byte, 0 + *skysql_handshake_payload = 0x00; + skysql_handshake_payload++; + + // to be understanded ???? + memcpy(skysql_handshake_payload, "mysql_native_password", strlen("mysql_native_password")); + skysql_handshake_payload = skysql_handshake_payload + strlen("mysql_native_password"); + + //write last byte, 0 + *skysql_handshake_payload = 0x00; + skysql_handshake_payload++; + + + + // create brigade + bb = apr_brigade_create(p, c->bucket_alloc); +/* + b = apr_bucket_pool_create(outbuf, sizeof(skysql_packet_header) + skysql_payload_size, p, c->bucket_alloc); + APR_BRIGADE_INSERT_HEAD(bb, b); + b = apr_bucket_flush_create(c->bucket_alloc); + APR_BRIGADE_INSERT_TAIL(bb, b); + + ap_pass_brigade(c->output_filters, bb); + apr_brigade_destroy(bb); +*/ + apr_brigade_write(bb, ap_filter_flush, c->output_filters, outbuf, sizeof(skysql_packet_header) + skysql_payload_size); + ap_fflush(c->output_filters, bb); + + apr_brigade_destroy(bb); + + return 1; +} + +int skygateway_query_result(conn_rec *c, apr_pool_t *p, MYSQL_conn *conn, const char *query) { + int query_ret = 0; + int num_fields = 0; + int return_data = 0; + uint8_t result_column_count = 0; + uint8_t header_result_packet[4]; + apr_bucket_brigade *bb1; + apr_bucket *b1; + uint8_t *outbuf = NULL; + apr_status_t rv; + uint8_t buffer[MAX_CHUNK]; + unsigned long bytes = MAX_CHUNK; + + query_ret = mysql_query(conn, query); + + fprintf(stderr, "HERE SEND QUERY\n"); + fflush(stderr); + + if (query_ret) { + // send error, packet #1 + skysql_send_error(c, 1, conn); + + return 1; + } + + ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "SKYSQLGW is sending result set ..."); + + rv = apr_socket_recv(conn->socket, buffer, &bytes); + + if (rv != APR_SUCCESS) { + fprintf(stderr, "Errore in recv\n"); + fflush(stderr); + return 1; + } + + bb1 = apr_brigade_create(p, c->bucket_alloc); + + apr_brigade_write(bb1, ap_filter_flush, c->output_filters, buffer, bytes); + ap_fflush(c->output_filters, bb1); + + apr_brigade_destroy(bb1); + + return 0; +} +