Limit the size of bulk insert operations by query length instead of number of rows.

This commit is contained in:
Alexey Kopytov
2007-01-12 18:27:59 +00:00
parent e643b65069
commit dbcb6cad02
2 changed files with 33 additions and 25 deletions

View File

@ -26,8 +26,8 @@
#include "db_driver.h"
#include "sb_list.h"
/* How many rows to insert in a single query (used in bulk insert operations) */
#define INSERT_ROWS 10000
/* Query length limit for bulk insert queries */
#define BULK_PACKET_SIZE (512*1024)
/* How many rows to insert before COMMITs (used in bulk insert) */
#define ROWS_BEFORE_COMMIT 1000
@ -689,20 +689,27 @@ int db_bulk_insert_init(db_conn_t *con, const char *query)
return 1;
}
/* Allocate initial query buffer (will be expanded later if needed) */
/* Allocate query buffer */
query_len = strlen(query);
con->bulk_max_rows = driver_caps.multi_rows_insert ? INSERT_ROWS : 1;
con->bulk_commit_max = driver_caps.needs_commit ? ROWS_BEFORE_COMMIT : 0;
con->bulk_commit_cnt = 0;
con->bulk_buflen = query_len + con->bulk_max_rows * 80 + 1;
if (query_len + 1 > BULK_PACKET_SIZE)
{
log_text(LOG_FATAL,
"Query length exceeds the maximum value (%u), aborting",
BULK_PACKET_SIZE);
return 1;
}
con->bulk_buflen = BULK_PACKET_SIZE;
con->bulk_buffer = (char *)malloc(con->bulk_buflen);
if (con->bulk_buffer == NULL)
return 1;
con->bulk_supported = driver_caps.multi_rows_insert;
con->bulk_commit_max = driver_caps.needs_commit ? ROWS_BEFORE_COMMIT : 0;
con->bulk_commit_cnt = 0;
strcpy(con->bulk_buffer, query);
con->bulk_ptr = query_len;
con->bulk_ptr_orig = query_len;
con->bulk_cnt = 0;
con->bulk_not_first = 0;
return 0;
}
@ -717,28 +724,30 @@ int db_bulk_insert_next(db_conn_t *con, const char *query)
Reserve space for '\0' and ',' (if not the first chunk in
a bulk insert
*/
while (con->bulk_ptr + query_len + 1 + con->bulk_not_first > con->bulk_buflen)
if (con->bulk_ptr + query_len + 1 + (con->bulk_cnt>0) > con->bulk_buflen)
{
con->bulk_buffer = (char *)realloc(con->bulk_buffer, con->bulk_buflen * 2);
if (con->bulk_buffer == NULL)
/* Is this a first row? */
if (!con->bulk_cnt)
{
log_text(LOG_FATAL,
"Query length exceeds the maximum value (%u), aborting",
con->bulk_buflen);
return 1;
}
if (db_bulk_do_insert(con, 0))
return 1;
con->bulk_buflen *= 2;
}
if (con->bulk_not_first)
if (con->bulk_cnt > 0)
{
con->bulk_buffer[con->bulk_ptr] = ',';
strcpy(con->bulk_buffer + con->bulk_ptr + 1, query);
}
else
strcpy(con->bulk_buffer + con->bulk_ptr, query);
con->bulk_ptr += query_len + con->bulk_not_first;
con->bulk_not_first = 1;
con->bulk_ptr += query_len + (con->bulk_cnt > 0);
con->bulk_cnt++;
if (con->bulk_cnt == con->bulk_max_rows && db_bulk_do_insert(con, 0))
return 1;
return 0;
}
@ -747,19 +756,16 @@ int db_bulk_insert_next(db_conn_t *con, const char *query)
int db_bulk_do_insert(db_conn_t *con, int is_last)
{
if (con->bulk_not_first == 0)
if (!con->bulk_cnt)
return 0;
if (db_query(con, con->bulk_buffer) == NULL)
return 1;
con->bulk_not_first = 0;
con->bulk_ptr = con->bulk_ptr_orig;
con->bulk_cnt = 0;
if (con->bulk_commit_max != 0)
{
con->bulk_commit_cnt += con->bulk_max_rows;
con->bulk_commit_cnt += con->bulk_cnt;
if (is_last || con->bulk_commit_cnt >= con->bulk_commit_max)
{
@ -769,6 +775,9 @@ int db_bulk_do_insert(db_conn_t *con, int is_last)
}
}
con->bulk_ptr = con->bulk_ptr_orig;
con->bulk_cnt = 0;
return 0;
}

View File

@ -190,13 +190,12 @@ typedef struct db_conn
db_error_t db_errno;
/* Internal fields */
char bulk_supported; /* 1, if multi-row inserts are supported by the driver */
unsigned int bulk_cnt; /* Current number of rows in bulk insert buffer */
unsigned int bulk_max_rows; /* Maximum number of rows in bulk insert buffer */
char * bulk_buffer; /* Bulk insert query buffer */
unsigned int bulk_buflen; /* Current length of bulk_buffer */
unsigned int bulk_ptr; /* Current position in bulk_buffer */
unsigned int bulk_ptr_orig; /* Save value of bulk_ptr */
unsigned int bulk_not_first; /* Indicates if bulk insert buffer has some rows */
unsigned int bulk_commit_cnt; /* Current value of uncommitted rows */
unsigned int bulk_commit_max; /* Maximum value of uncommitted rows */
int thread_id; /* Assiciated thread id (required to collect per-thread stats */