From dbcb6cad02c431fd7b4ea4fd0488ff24cf4a5d32 Mon Sep 17 00:00:00 2001 From: Alexey Kopytov Date: Fri, 12 Jan 2007 18:27:59 +0000 Subject: [PATCH] Limit the size of bulk insert operations by query length instead of number of rows. --- sysbench/db_driver.c | 55 ++++++++++++++++++++++++++------------------ sysbench/db_driver.h | 3 +-- 2 files changed, 33 insertions(+), 25 deletions(-) diff --git a/sysbench/db_driver.c b/sysbench/db_driver.c index 4225c9d..faaf1d6 100644 --- a/sysbench/db_driver.c +++ b/sysbench/db_driver.c @@ -26,8 +26,8 @@ #include "db_driver.h" #include "sb_list.h" -/* How many rows to insert in a single query (used in bulk insert operations) */ -#define INSERT_ROWS 10000 +/* Query length limit for bulk insert queries */ +#define BULK_PACKET_SIZE (512*1024) /* How many rows to insert before COMMITs (used in bulk insert) */ #define ROWS_BEFORE_COMMIT 1000 @@ -689,20 +689,27 @@ int db_bulk_insert_init(db_conn_t *con, const char *query) return 1; } - /* Allocate initial query buffer (will be expanded later if needed) */ + /* Allocate query buffer */ query_len = strlen(query); - con->bulk_max_rows = driver_caps.multi_rows_insert ? INSERT_ROWS : 1; - con->bulk_commit_max = driver_caps.needs_commit ? ROWS_BEFORE_COMMIT : 0; - con->bulk_commit_cnt = 0; - con->bulk_buflen = query_len + con->bulk_max_rows * 80 + 1; + if (query_len + 1 > BULK_PACKET_SIZE) + { + log_text(LOG_FATAL, + "Query length exceeds the maximum value (%u), aborting", + BULK_PACKET_SIZE); + return 1; + } + con->bulk_buflen = BULK_PACKET_SIZE; con->bulk_buffer = (char *)malloc(con->bulk_buflen); if (con->bulk_buffer == NULL) return 1; + + con->bulk_supported = driver_caps.multi_rows_insert; + con->bulk_commit_max = driver_caps.needs_commit ? ROWS_BEFORE_COMMIT : 0; + con->bulk_commit_cnt = 0; strcpy(con->bulk_buffer, query); con->bulk_ptr = query_len; con->bulk_ptr_orig = query_len; con->bulk_cnt = 0; - con->bulk_not_first = 0; return 0; } @@ -717,28 +724,30 @@ int db_bulk_insert_next(db_conn_t *con, const char *query) Reserve space for '\0' and ',' (if not the first chunk in a bulk insert */ - while (con->bulk_ptr + query_len + 1 + con->bulk_not_first > con->bulk_buflen) + if (con->bulk_ptr + query_len + 1 + (con->bulk_cnt>0) > con->bulk_buflen) { - con->bulk_buffer = (char *)realloc(con->bulk_buffer, con->bulk_buflen * 2); - if (con->bulk_buffer == NULL) + /* Is this a first row? */ + if (!con->bulk_cnt) + { + log_text(LOG_FATAL, + "Query length exceeds the maximum value (%u), aborting", + con->bulk_buflen); + return 1; + } + if (db_bulk_do_insert(con, 0)) return 1; - con->bulk_buflen *= 2; } - if (con->bulk_not_first) + if (con->bulk_cnt > 0) { con->bulk_buffer[con->bulk_ptr] = ','; strcpy(con->bulk_buffer + con->bulk_ptr + 1, query); } else strcpy(con->bulk_buffer + con->bulk_ptr, query); - con->bulk_ptr += query_len + con->bulk_not_first; - - con->bulk_not_first = 1; + con->bulk_ptr += query_len + (con->bulk_cnt > 0); con->bulk_cnt++; - if (con->bulk_cnt == con->bulk_max_rows && db_bulk_do_insert(con, 0)) - return 1; return 0; } @@ -747,19 +756,16 @@ int db_bulk_insert_next(db_conn_t *con, const char *query) int db_bulk_do_insert(db_conn_t *con, int is_last) { - if (con->bulk_not_first == 0) + if (!con->bulk_cnt) return 0; if (db_query(con, con->bulk_buffer) == NULL) return 1; - con->bulk_not_first = 0; - con->bulk_ptr = con->bulk_ptr_orig; - con->bulk_cnt = 0; if (con->bulk_commit_max != 0) { - con->bulk_commit_cnt += con->bulk_max_rows; + con->bulk_commit_cnt += con->bulk_cnt; if (is_last || con->bulk_commit_cnt >= con->bulk_commit_max) { @@ -769,6 +775,9 @@ int db_bulk_do_insert(db_conn_t *con, int is_last) } } + con->bulk_ptr = con->bulk_ptr_orig; + con->bulk_cnt = 0; + return 0; } diff --git a/sysbench/db_driver.h b/sysbench/db_driver.h index 5f1817d..9381df5 100644 --- a/sysbench/db_driver.h +++ b/sysbench/db_driver.h @@ -190,13 +190,12 @@ typedef struct db_conn db_error_t db_errno; /* Internal fields */ + char bulk_supported; /* 1, if multi-row inserts are supported by the driver */ unsigned int bulk_cnt; /* Current number of rows in bulk insert buffer */ - unsigned int bulk_max_rows; /* Maximum number of rows in bulk insert buffer */ char * bulk_buffer; /* Bulk insert query buffer */ unsigned int bulk_buflen; /* Current length of bulk_buffer */ unsigned int bulk_ptr; /* Current position in bulk_buffer */ unsigned int bulk_ptr_orig; /* Save value of bulk_ptr */ - unsigned int bulk_not_first; /* Indicates if bulk insert buffer has some rows */ unsigned int bulk_commit_cnt; /* Current value of uncommitted rows */ unsigned int bulk_commit_max; /* Maximum value of uncommitted rows */ int thread_id; /* Assiciated thread id (required to collect per-thread stats */