New version compatible with prepared statements
Preopened connections are compatible with the mysql protocol I wrote by myself The preopened connection is ONLY working with prefork mpm.
This commit is contained in:
@ -6,6 +6,7 @@
|
||||
|
||||
#include "skysql_gw.h"
|
||||
#include "apr_sha1.h"
|
||||
#include "apr_general.h"
|
||||
|
||||
#define MYSQL_PROTOCOL_VERSION41_CHAR '*'
|
||||
|
||||
@ -534,7 +535,7 @@ apr_status_t skysql_send_result(conn_rec *c, uint8_t *data, uint8_t len) {
|
||||
return ap_fflush(c->output_filters, bb);
|
||||
}
|
||||
|
||||
apr_status_t skysql_send_eof(conn_rec *c, uint8_t packet_number) {
|
||||
apr_status_t skysql_send_eof(conn_rec *c, apr_pool_t *p, uint8_t packet_number) {
|
||||
apr_status_t rv;
|
||||
rv = APR_SUCCESS;
|
||||
apr_bucket_brigade *bb;
|
||||
@ -552,7 +553,7 @@ apr_status_t skysql_send_eof(conn_rec *c, uint8_t packet_number) {
|
||||
skysql_payload_size = sizeof(field_count) + sizeof(skysql_server_status) + sizeof(skysql_warning_count);
|
||||
|
||||
// allocate memory for packet header + payload
|
||||
outbuf = (uint8_t *) apr_pcalloc(c->pool, sizeof(skysql_packet_header) + skysql_payload_size);
|
||||
outbuf = (uint8_t *) apr_pcalloc(p, sizeof(skysql_packet_header) + skysql_payload_size);
|
||||
|
||||
// write packet header with packet number
|
||||
skysql_set_byte3(skysql_packet_header, skysql_payload_size);
|
||||
@ -579,7 +580,7 @@ apr_status_t skysql_send_eof(conn_rec *c, uint8_t packet_number) {
|
||||
skysql_payload = skysql_payload + sizeof(skysql_warning_count);
|
||||
|
||||
// create brigade
|
||||
bb = apr_brigade_create(c->pool, c->bucket_alloc);
|
||||
bb = apr_brigade_create(p, c->bucket_alloc);
|
||||
// write
|
||||
apr_brigade_write(bb, ap_filter_flush, c->output_filters, outbuf, sizeof(skysql_packet_header) + skysql_payload_size);
|
||||
//send & flush
|
||||
@ -828,12 +829,16 @@ int skygateway_query_result(conn_rec *c, apr_pool_t *p, MYSQL_conn *conn, const
|
||||
apr_status_t rv;
|
||||
uint8_t buffer[MAX_CHUNK];
|
||||
unsigned long bytes = MAX_CHUNK;
|
||||
unsigned long tot_bytes = 0;
|
||||
int cycle=0;
|
||||
apr_pollset_t *pset;
|
||||
apr_pollfd_t pfd;
|
||||
apr_int32_t nsocks=1;
|
||||
apr_status_t poll_rv;
|
||||
int is_eof = 0;
|
||||
|
||||
query_ret = mysql_query(conn, query);
|
||||
|
||||
fprintf(stderr, "HERE SEND QUERY\n");
|
||||
fflush(stderr);
|
||||
|
||||
if (query_ret) {
|
||||
// send error, packet #1
|
||||
skysql_send_error(c, 1, conn);
|
||||
@ -843,21 +848,368 @@ int skygateway_query_result(conn_rec *c, apr_pool_t *p, MYSQL_conn *conn, const
|
||||
|
||||
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "SKYSQLGW is sending result set ...");
|
||||
|
||||
rv = apr_socket_recv(conn->socket, buffer, &bytes);
|
||||
poll_rv = apr_pollset_create(&pset, 1, p, 0);
|
||||
|
||||
if (rv != APR_SUCCESS) {
|
||||
fprintf(stderr, "Errore in recv\n");
|
||||
fflush(stderr);
|
||||
return 1;
|
||||
}
|
||||
pfd.p = p;
|
||||
pfd.desc_type = APR_POLL_SOCKET;
|
||||
pfd.reqevents = APR_POLLIN;
|
||||
pfd.rtnevents = APR_POLLIN;
|
||||
pfd.desc.s = conn->socket;
|
||||
pfd.client_data = NULL;
|
||||
|
||||
bb1 = apr_brigade_create(p, c->bucket_alloc);
|
||||
//rv = apr_pollset_add(pset, &pfd);
|
||||
|
||||
apr_brigade_write(bb1, ap_filter_flush, c->output_filters, buffer, bytes);
|
||||
ap_fflush(c->output_filters, bb1);
|
||||
//rv = apr_socket_opt_set(conn->socket, APR_SO_NONBLOCK , 1);
|
||||
|
||||
apr_brigade_destroy(bb1);
|
||||
apr_socket_timeout_set(conn->socket, 100000000);
|
||||
|
||||
while(1) {
|
||||
char errmesg_p[1000]="";
|
||||
bytes=MAX_CHUNK;
|
||||
|
||||
memset(buffer, '\0', MAX_CHUNK);
|
||||
|
||||
//rv = apr_wait_for_io_or_timeout(NULL, conn->socket, 1);
|
||||
//fprintf(stderr, "wait socket recv %lu\n", bytes);
|
||||
//apr_strerror(rv, errmesg_p, sizeof(errmesg_p));
|
||||
//fprintf(stderr, "wait Errore in recv, rv is %i, [%s]\n", rv, errmesg_p);
|
||||
//fflush(stderr);
|
||||
|
||||
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "SKYSQLGW is receiving ...");
|
||||
|
||||
//apr_socket_atreadeof(conn->socket, &is_eof);
|
||||
|
||||
rv = apr_socket_recv(conn->socket, buffer, &bytes);
|
||||
|
||||
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "SKYSQLGW received %lu bytes", bytes);
|
||||
|
||||
if (rv) {
|
||||
if (APR_STATUS_IS_EAGAIN(rv)) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
tot_bytes += bytes;
|
||||
|
||||
if (rv != APR_SUCCESS && rv != APR_EOF && rv != APR_EAGAIN) {
|
||||
char errmesg[1000]="";
|
||||
apr_strerror(rv, errmesg, sizeof(errmesg));
|
||||
|
||||
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "SKYSQLGW receive error %i, [%s]", rv, errmesg);
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
if (rv == APR_EOF && bytes == 0) {
|
||||
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "SKYSQLGW receive error: EOF");
|
||||
}
|
||||
|
||||
bb1 = apr_brigade_create(p, c->bucket_alloc);
|
||||
|
||||
apr_brigade_write(bb1, ap_filter_flush, c->output_filters, buffer, bytes);
|
||||
ap_fflush(c->output_filters, bb1);
|
||||
|
||||
apr_brigade_destroy(bb1);
|
||||
|
||||
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "SKYSQLGW receive, brigade sent with %li bytes", bytes);
|
||||
|
||||
cycle++;
|
||||
|
||||
|
||||
if (bytes < MAX_CHUNK) {
|
||||
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "SKYSQLGW receive: less bytes than buffer here, Return from query result: total bytes %lu in %i", tot_bytes, cycle);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (bytes == MAX_CHUNK) {
|
||||
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "SKYSQLGW receive: ALL bytes in the buffer here, continue");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "SKYSQLGW receive: Return from query result: total bytes %lu in %i", tot_bytes, cycle);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int skygateway_statement_prepare_result(conn_rec *c, apr_pool_t *p, MYSQL_conn *conn, const char *query, int len) {
|
||||
int query_ret = 0;
|
||||
int num_fields = 0;
|
||||
int return_data = 0;
|
||||
uint8_t result_column_count = 0;
|
||||
uint8_t header_result_packet[4];
|
||||
apr_bucket_brigade *bb1;
|
||||
apr_bucket *b1;
|
||||
uint8_t *outbuf = NULL;
|
||||
apr_status_t rv;
|
||||
uint8_t buffer[MAX_CHUNK];
|
||||
unsigned long bytes = MAX_CHUNK;
|
||||
unsigned long tot_bytes = 0;
|
||||
int cycle=0;
|
||||
apr_pollset_t *pset;
|
||||
apr_pollfd_t pfd;
|
||||
apr_int32_t nsocks=1;
|
||||
apr_status_t poll_rv;
|
||||
int is_eof = 0;
|
||||
|
||||
query_ret = mysql_send_command(conn, query, 0x16, len);
|
||||
|
||||
if (query_ret) {
|
||||
// send error, packet #1
|
||||
skysql_send_error(c, 1, conn);
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "SKYSQLGW is sending result set ...");
|
||||
|
||||
poll_rv = apr_pollset_create(&pset, 1, p, 0);
|
||||
|
||||
pfd.p = p;
|
||||
pfd.desc_type = APR_POLL_SOCKET;
|
||||
pfd.reqevents = APR_POLLIN;
|
||||
pfd.rtnevents = APR_POLLIN;
|
||||
pfd.desc.s = conn->socket;
|
||||
pfd.client_data = NULL;
|
||||
|
||||
//rv = apr_pollset_add(pset, &pfd);
|
||||
|
||||
//rv = apr_socket_opt_set(conn->socket, APR_SO_NONBLOCK , 1);
|
||||
|
||||
apr_socket_timeout_set(conn->socket, 100000000);
|
||||
|
||||
while(1) {
|
||||
char errmesg_p[1000]="";
|
||||
bytes=MAX_CHUNK;
|
||||
|
||||
memset(buffer, '\0', MAX_CHUNK);
|
||||
|
||||
//rv = apr_wait_for_io_or_timeout(NULL, conn->socket, 1);
|
||||
//fprintf(stderr, "wait socket recv %lu\n", bytes);
|
||||
//apr_strerror(rv, errmesg_p, sizeof(errmesg_p));
|
||||
//fprintf(stderr, "wait Errore in recv, rv is %i, [%s]\n", rv, errmesg_p);
|
||||
//fflush(stderr);
|
||||
|
||||
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "SKYSQLGW is receiving ...");
|
||||
|
||||
//apr_socket_atreadeof(conn->socket, &is_eof);
|
||||
|
||||
rv = apr_socket_recv(conn->socket, buffer, &bytes);
|
||||
|
||||
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "SKYSQLGW received %lu bytes", bytes);
|
||||
|
||||
if (rv) {
|
||||
if (APR_STATUS_IS_EAGAIN(rv)) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
tot_bytes += bytes;
|
||||
|
||||
if (rv != APR_SUCCESS && rv != APR_EOF && rv != APR_EAGAIN) {
|
||||
char errmesg[1000]="";
|
||||
apr_strerror(rv, errmesg, sizeof(errmesg));
|
||||
|
||||
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "SKYSQLGW receive error %i, [%s]", rv, errmesg);
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
if (rv == APR_EOF && bytes == 0) {
|
||||
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "SKYSQLGW receive error: EOF");
|
||||
}
|
||||
|
||||
bb1 = apr_brigade_create(p, c->bucket_alloc);
|
||||
|
||||
apr_brigade_write(bb1, ap_filter_flush, c->output_filters, buffer, bytes);
|
||||
ap_fflush(c->output_filters, bb1);
|
||||
|
||||
apr_brigade_destroy(bb1);
|
||||
|
||||
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "SKYSQLGW receive, brigade sent with %li bytes", bytes);
|
||||
|
||||
cycle++;
|
||||
|
||||
|
||||
if (bytes < MAX_CHUNK) {
|
||||
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "SKYSQLGW receive: less bytes than buffer here, Return from query result: total bytes %lu in %i", tot_bytes, cycle);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (bytes == MAX_CHUNK) {
|
||||
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "SKYSQLGW receive: ALL bytes in the buffer here, continue");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "SKYSQLGW receive: Return from query result: total bytes %lu in %i", tot_bytes, cycle);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int skygateway_statement_execute_result(conn_rec *c, apr_pool_t *p, MYSQL_conn *conn, const char *query, int len) {
|
||||
int query_ret = 0;
|
||||
int num_fields = 0;
|
||||
int return_data = 0;
|
||||
uint8_t result_column_count = 0;
|
||||
uint8_t header_result_packet[4];
|
||||
apr_bucket_brigade *bb1;
|
||||
apr_bucket *b1;
|
||||
uint8_t *outbuf = NULL;
|
||||
apr_status_t rv;
|
||||
uint8_t buffer[MAX_CHUNK];
|
||||
unsigned long bytes = MAX_CHUNK;
|
||||
unsigned long tot_bytes = 0;
|
||||
int cycle=0;
|
||||
apr_pollset_t *pset;
|
||||
apr_pollfd_t pfd;
|
||||
apr_int32_t nsocks=1;
|
||||
apr_status_t poll_rv;
|
||||
int is_eof = 0;
|
||||
|
||||
query_ret = mysql_send_command(conn, query, 0x17, len);
|
||||
|
||||
if (query_ret) {
|
||||
// send error, packet #1
|
||||
skysql_send_error(c, 1, conn);
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "SKYSQLGW is sending result set ...");
|
||||
|
||||
poll_rv = apr_pollset_create(&pset, 1, p, 0);
|
||||
|
||||
pfd.p = p;
|
||||
pfd.desc_type = APR_POLL_SOCKET;
|
||||
pfd.reqevents = APR_POLLIN;
|
||||
pfd.rtnevents = APR_POLLIN;
|
||||
pfd.desc.s = conn->socket;
|
||||
pfd.client_data = NULL;
|
||||
|
||||
//rv = apr_pollset_add(pset, &pfd);
|
||||
|
||||
//rv = apr_socket_opt_set(conn->socket, APR_SO_NONBLOCK , 1);
|
||||
|
||||
apr_socket_timeout_set(conn->socket, 100000000);
|
||||
|
||||
while(1) {
|
||||
char errmesg_p[1000]="";
|
||||
bytes=MAX_CHUNK;
|
||||
|
||||
memset(buffer, '\0', MAX_CHUNK);
|
||||
|
||||
//rv = apr_wait_for_io_or_timeout(NULL, conn->socket, 1);
|
||||
//fprintf(stderr, "wait socket recv %lu\n", bytes);
|
||||
//apr_strerror(rv, errmesg_p, sizeof(errmesg_p));
|
||||
//fprintf(stderr, "wait Errore in recv, rv is %i, [%s]\n", rv, errmesg_p);
|
||||
//fflush(stderr);
|
||||
|
||||
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "SKYSQLGW is receiving ...");
|
||||
|
||||
//apr_socket_atreadeof(conn->socket, &is_eof);
|
||||
|
||||
rv = apr_socket_recv(conn->socket, buffer, &bytes);
|
||||
|
||||
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "SKYSQLGW received %lu bytes", bytes);
|
||||
|
||||
if (rv) {
|
||||
if (APR_STATUS_IS_EAGAIN(rv)) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
tot_bytes += bytes;
|
||||
|
||||
if (rv != APR_SUCCESS && rv != APR_EOF && rv != APR_EAGAIN) {
|
||||
char errmesg[1000]="";
|
||||
apr_strerror(rv, errmesg, sizeof(errmesg));
|
||||
|
||||
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "SKYSQLGW receive error %i, [%s]", rv, errmesg);
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
if (rv == APR_EOF && bytes == 0) {
|
||||
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "SKYSQLGW receive error: EOF");
|
||||
}
|
||||
|
||||
bb1 = apr_brigade_create(p, c->bucket_alloc);
|
||||
|
||||
apr_brigade_write(bb1, ap_filter_flush, c->output_filters, buffer, bytes);
|
||||
ap_fflush(c->output_filters, bb1);
|
||||
|
||||
apr_brigade_destroy(bb1);
|
||||
|
||||
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "SKYSQLGW receive, brigade sent with %li bytes", bytes);
|
||||
|
||||
cycle++;
|
||||
|
||||
|
||||
if (bytes < MAX_CHUNK) {
|
||||
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "SKYSQLGW receive: less bytes than buffer here, Return from query result: total bytes %lu in %i", tot_bytes, cycle);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (bytes == MAX_CHUNK) {
|
||||
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "SKYSQLGW receive: ALL bytes in the buffer here, continue");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, c->base_server, "SKYSQLGW receive: Return from query result: total bytes %lu in %i", tot_bytes, cycle);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int mysql_send_command(MYSQL_conn *conn, const char *command, int cmd, int len) {
|
||||
apr_status_t rv;
|
||||
//uint8_t *packet_buffer=NULL;
|
||||
uint8_t packet_buffer[SMALL_CHUNK];
|
||||
long bytes;
|
||||
int fd;
|
||||
|
||||
//packet_buffer = (uint8_t *) calloc(1, 5 + len + 1);
|
||||
memset(&packet_buffer, '\0', sizeof(packet_buffer));
|
||||
|
||||
packet_buffer[4]= cmd;
|
||||
memcpy(packet_buffer+5, command, len);
|
||||
|
||||
skysql_set_byte3(packet_buffer, 1 + len);
|
||||
|
||||
bytes = 4 + 1 + len;
|
||||
|
||||
#ifdef MYSQL_CONN_DEBUG
|
||||
fprintf(stderr, "THE COMMAND is [%s] len %i\n", command, bytes);
|
||||
fprintf(stderr, "THE COMMAND TID is [%lu]", conn->tid);
|
||||
fprintf(stderr, "THE COMMAND scramble is [%s]", conn->scramble);
|
||||
if (conn->socket == NULL) {
|
||||
fprintf(stderr, "***** THE COMMAND sock struct is NULL\n");
|
||||
}
|
||||
fwrite(packet_buffer, bytes, 1, stderr);
|
||||
fflush(stderr);
|
||||
#endif
|
||||
apr_os_sock_get(&fd,conn->socket);
|
||||
|
||||
#ifdef MYSQL_CONN_DEBUG
|
||||
fprintf(stderr, "COMMAND Socket FD is %i\n", fd);
|
||||
fflush(stderr);
|
||||
#endif
|
||||
|
||||
rv = apr_socket_send(conn->socket, packet_buffer, &bytes);
|
||||
|
||||
#ifdef MYSQL_CONN_DEBUG
|
||||
fprintf(stderr, "COMMAND SENT [%x] [%s]\n", cmd, command);
|
||||
fflush(stderr);
|
||||
#endif
|
||||
|
||||
if (rv != APR_SUCCESS) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user