Merge remote-tracking branch 'origin/MAX-324' into MAX-324

Conflicts:
	server/modules/routing/dbshard/dbshard.c
This commit is contained in:
Markus Makela
2014-12-07 06:11:24 +02:00
9 changed files with 476 additions and 850 deletions

View File

@ -94,7 +94,9 @@ char* syslog_ident_str = NULL;
*/ */
static int lmlock; static int lmlock;
static logmanager_t* lm; static logmanager_t* lm;
static bool flushall_flag;
static bool flushall_started_flag;
static bool flushall_done_flag;
/** Writer thread structure */ /** Writer thread structure */
struct filewriter_st { struct filewriter_st {
@ -286,12 +288,14 @@ static char* add_slash(char* str);
static bool check_file_and_path( static bool check_file_and_path(
char* filename, char* filename,
bool* writable); bool* writable,
bool do_log);
static bool file_is_symlink(char* filename); static bool file_is_symlink(char* filename);
static int skygw_log_disable_raw(logfile_id_t id, bool emergency); /*< no locking */ static int skygw_log_disable_raw(logfile_id_t id, bool emergency); /*< no locking */
static int find_last_seqno(strpart_t* parts, int seqno, int seqnoidx); static int find_last_seqno(strpart_t* parts, int seqno, int seqnoidx);
void flushall_logfiles(bool flush);
bool thr_flushall_check();
const char* get_suffix_default(void) const char* get_suffix_default(void)
{ {
@ -441,17 +445,6 @@ static bool logmanager_init_nomutex(
return_succp: return_succp:
if (err != 0) if (err != 0)
{ {
if (lm != NULL)
{
if (lm->lm_clientmes != NULL)
{
skygw_message_done(lm->lm_clientmes);
}
if (lm->lm_logmes != NULL)
{
skygw_message_done(lm->lm_logmes);
}
}
/** This releases memory of all created objects */ /** This releases memory of all created objects */
logmanager_done_nomutex(); logmanager_done_nomutex();
fprintf(stderr, "*\n* Error : Initializing log manager failed.\n*\n"); fprintf(stderr, "*\n* Error : Initializing log manager failed.\n*\n");
@ -2053,7 +2046,7 @@ static bool logfile_create(
* If file exists but is different type, create fails and * If file exists but is different type, create fails and
* new, increased sequence number is added to file name. * new, increased sequence number is added to file name.
*/ */
if (check_file_and_path(lf->lf_full_file_name, &writable)) if (check_file_and_path(lf->lf_full_file_name, &writable, true))
{ {
/** Found similarly named file which isn't writable */ /** Found similarly named file which isn't writable */
if (!writable || file_is_symlink(lf->lf_full_file_name)) if (!writable || file_is_symlink(lf->lf_full_file_name))
@ -2077,13 +2070,11 @@ static bool logfile_create(
if (store_shmem) if (store_shmem)
{ {
if (check_file_and_path(lf->lf_full_file_name, &writable)) if (check_file_and_path(lf->lf_full_link_name, &writable, true))
{ {
/** Found similarly named file which isn't writable */ /** Found similarly named link which isn't writable */
if (!writable || if (!writable)
file_is_symlink(lf->lf_full_file_name))
{ {
unlink(lf->lf_full_file_name);
nameconflicts = true; nameconflicts = true;
} }
} }
@ -2214,7 +2205,6 @@ return_succp:
* *
* @return Pointer to filename, of NULL if failed. * @return Pointer to filename, of NULL if failed.
* *
*
*/ */
static char* form_full_file_name( static char* form_full_file_name(
strpart_t* parts, strpart_t* parts,
@ -2231,9 +2221,21 @@ static char* form_full_file_name(
if (lf->lf_name_seqno != -1) if (lf->lf_name_seqno != -1)
{ {
lf->lf_name_seqno = find_last_seqno(parts, int file_sn;
lf->lf_name_seqno, int link_sn = 0;
seqnoidx); char* tmp = parts[0].sp_string;
file_sn = find_last_seqno(parts, lf->lf_name_seqno, seqnoidx);
if (lf->lf_linkpath != NULL)
{
tmp = parts[0].sp_string;
parts[0].sp_string = lf->lf_linkpath;
link_sn = find_last_seqno(parts, lf->lf_name_seqno, seqnoidx);
parts[0].sp_string = tmp;
}
lf->lf_name_seqno = MAX(file_sn, link_sn);
seqno = lf->lf_name_seqno; seqno = lf->lf_name_seqno;
s = UINTLEN(seqno); s = UINTLEN(seqno);
seqnostr = (char *)malloc((int)s+1); seqnostr = (char *)malloc((int)s+1);
@ -2354,7 +2356,8 @@ static char* add_slash(
*/ */
static bool check_file_and_path( static bool check_file_and_path(
char* filename, char* filename,
bool* writable) bool* writable,
bool do_log)
{ {
int fd; int fd;
bool exists; bool exists;
@ -2382,11 +2385,23 @@ static bool check_file_and_path(
if (fd == -1) if (fd == -1)
{ {
fprintf(stderr, if (do_log && file_is_symlink(filename))
"*\n* Error : Can't access %s due " {
"to %s.\n", fprintf(stderr,
filename, "*\n* Error : Can't access "
strerror(errno)); "file pointed to by %s due "
"to %s.\n",
filename,
strerror(errno));
}
else if (do_log)
{
fprintf(stderr,
"*\n* Error : Can't access %s due "
"to %s.\n",
filename,
strerror(errno));
}
if (writable) if (writable)
{ {
*writable = false; *writable = false;
@ -2403,11 +2418,24 @@ static bool check_file_and_path(
} }
else else
{ {
fprintf(stderr, if (do_log &&
"*\n* Error : Can't write to " file_is_symlink(filename))
"%s due to %s.\n", {
filename, fprintf(stderr,
strerror(errno)); "*\n* Error : Can't write to "
"file pointed to by %s due to "
"%s.\n",
filename,
strerror(errno));
}
else if (do_log)
{
fprintf(stderr,
"*\n* Error : Can't write to "
"%s due to %s.\n",
filename,
strerror(errno));
}
*writable = false; *writable = false;
} }
} }
@ -2417,10 +2445,21 @@ static bool check_file_and_path(
} }
else else
{ {
fprintf(stderr, if (do_log && file_is_symlink(filename))
"*\n* Error : Can't access %s due to %s.\n", {
filename, fprintf(stderr,
strerror(errno)); "*\n* Error : Can't access the file "
"pointed to by %s due to %s.\n",
filename,
strerror(errno));
}
else if (do_log)
{
fprintf(stderr,
"*\n* Error : Can't access %s due to %s.\n",
filename,
strerror(errno));
}
exists = false; exists = false;
if (writable) if (writable)
@ -2572,7 +2611,7 @@ static bool logfile_init(
logfile_free_memory(logfile); logfile_free_memory(logfile);
goto return_with_succp; goto return_with_succp;
} }
#if defined(SS_DEBUG)
if (store_shmem) if (store_shmem)
{ {
fprintf(stderr, "%s\t: %s->%s\n", fprintf(stderr, "%s\t: %s->%s\n",
@ -2586,7 +2625,6 @@ static bool logfile_init(
STRLOGNAME(logfile_id), STRLOGNAME(logfile_id),
logfile->lf_full_file_name); logfile->lf_full_file_name);
} }
#endif
succp = true; succp = true;
logfile->lf_state = RUN; logfile->lf_state = RUN;
CHK_LOGFILE(logfile); CHK_LOGFILE(logfile);
@ -2802,13 +2840,15 @@ static void* thr_filewriter_fun(
int i; int i;
blockbuf_state_t flush_blockbuf; /**< flush single block buffer. */ blockbuf_state_t flush_blockbuf; /**< flush single block buffer. */
bool flush_logfile; /**< flush logfile */ bool flush_logfile; /**< flush logfile */
bool flushall_logfiles;/**< flush all logfiles */ bool do_flushall = false;
bool rotate_logfile; /*< close current and open new file */ bool rotate_logfile; /*< close current and open new file */
size_t vn1; size_t vn1;
size_t vn2; size_t vn2;
thr = (skygw_thread_t *)data; thr = (skygw_thread_t *)data;
fwr = (filewriter_t *)skygw_thread_get_data(thr); fwr = (filewriter_t *)skygw_thread_get_data(thr);
flushall_logfiles(false);
CHK_FILEWRITER(fwr); CHK_FILEWRITER(fwr);
ss_debug(skygw_thread_set_state(thr, THR_RUNNING)); ss_debug(skygw_thread_set_state(thr, THR_RUNNING));
@ -2821,8 +2861,9 @@ static void* thr_filewriter_fun(
* Reset message to avoid redundant calls. * Reset message to avoid redundant calls.
*/ */
skygw_message_wait(fwr->fwr_logmes); skygw_message_wait(fwr->fwr_logmes);
if(skygw_thread_must_exit(thr)){
flushall_logfiles = skygw_thread_must_exit(thr); flushall_logfiles(true);
}
/** Process all logfiles which have buffered writes. */ /** Process all logfiles which have buffered writes. */
for (i=LOGFILE_FIRST; i<=LOGFILE_LAST; i <<= 1) for (i=LOGFILE_FIRST; i<=LOGFILE_LAST; i <<= 1)
@ -2831,6 +2872,10 @@ static void* thr_filewriter_fun(
/** /**
* Get file pointer of current logfile. * Get file pointer of current logfile.
*/ */
do_flushall = thr_flushall_check();
file = fwr->fwr_file[i]; file = fwr->fwr_file[i];
lf = &lm->lm_logfile[(logfile_id_t)i]; lf = &lm->lm_logfile[(logfile_id_t)i];
@ -2901,7 +2946,7 @@ static void* thr_filewriter_fun(
if (bb->bb_buf_used != 0 && if (bb->bb_buf_used != 0 &&
(flush_blockbuf == BB_FULL || (flush_blockbuf == BB_FULL ||
flush_logfile || flush_logfile ||
flushall_logfiles)) do_flushall))
{ {
/** /**
* buffer is at least half-full * buffer is at least half-full
@ -2920,7 +2965,7 @@ static void* thr_filewriter_fun(
(void *)bb->bb_buf, (void *)bb->bb_buf,
bb->bb_buf_used, bb->bb_buf_used,
(flush_logfile || (flush_logfile ||
flushall_logfiles)); do_flushall));
if (err) if (err)
{ {
fprintf(stderr, fprintf(stderr,
@ -2967,13 +3012,28 @@ static void* thr_filewriter_fun(
* Loop is restarted to ensure that all logfiles are * Loop is restarted to ensure that all logfiles are
* flushed. * flushed.
*/ */
if (!flushall_logfiles && skygw_thread_must_exit(thr))
if(flushall_started_flag){
flushall_started_flag = false;
flushall_done_flag = true;
i = LOGFILE_FIRST;
goto retry_flush_on_exit;
}
if (!thr_flushall_check() && skygw_thread_must_exit(thr))
{ {
flushall_logfiles = true; flushall_logfiles(true);
i = LOGFILE_FIRST; i = LOGFILE_FIRST;
goto retry_flush_on_exit; goto retry_flush_on_exit;
} }
} /* for */ }/* for */
if(flushall_done_flag){
flushall_done_flag = false;
flushall_logfiles(false);
skygw_message_send(fwr->fwr_clientmes);
}
} /* while (!skygw_thread_must_exit) */ } /* while (!skygw_thread_must_exit) */
ss_debug(skygw_thread_set_state(thr, THR_STOPPED)); ss_debug(skygw_thread_set_state(thr, THR_STOPPED));
@ -3061,7 +3121,7 @@ static int find_last_seqno(
} }
} }
if (check_file_and_path(filename, NULL)) if (check_file_and_path(filename, NULL, false))
{ {
seqno++; seqno++;
} }
@ -3073,4 +3133,34 @@ static int find_last_seqno(
free(snstr); free(snstr);
return seqno; return seqno;
} }
bool thr_flushall_check()
{
bool rval = false;
simple_mutex_lock(&lm->lm_mutex,true);
rval = flushall_flag;
if(rval && !flushall_started_flag && !flushall_done_flag){
flushall_started_flag = true;
}
simple_mutex_unlock(&lm->lm_mutex);
return rval;
}
void flushall_logfiles(bool flush)
{
simple_mutex_lock(&lm->lm_mutex,true);
flushall_flag = flush;
simple_mutex_unlock(&lm->lm_mutex);
}
/**
* Flush all log files synchronously
*/
void skygw_log_sync_all(void)
{
skygw_log_write(LOGFILE_TRACE,"Starting log flushing to disk.");
flushall_logfiles(true);
skygw_message_send(lm->lm_logmes);
skygw_message_wait(lm->lm_clientmes);
}

View File

@ -1441,47 +1441,55 @@ char* skygw_get_qtype_str(
/** /**
* Returns an array of strings of databases that this query uses. * Returns an array of strings of databases that this query uses.
* If the database isn't defined in the query, it is assumed that this query only targets the current database. * If the database isn't defined in the query, it is assumed that this query
* The value of @p size is set to the number of allocated strings. The caller is responsible for freeing all the allocated memory. * only targets the current database.
* The value of @p size is set to the number of allocated strings. The caller is
* responsible for freeing all the allocated memory.
* @param querybuf GWBUF containing the query * @param querybuf GWBUF containing the query
* @param size Size of the resulting array * @param size Size of the resulting array
* @return A new array of strings containing the database names or NULL if no databases were found. * @return A new array of strings containing the database names or NULL if no
* databases were found.
*/ */
char** skygw_get_database_names(GWBUF* querybuf,int* size) char** skygw_get_database_names(GWBUF* querybuf,int* size)
{ {
LEX* lex; LEX* lex;
TABLE_LIST* tbl; TABLE_LIST* tbl;
char **databases = NULL, **tmp = NULL; char **databases = NULL, **tmp = NULL;
int currsz = 0,i = 0; int currsz = 0,i = 0;
if( (lex = get_lex(querybuf)) == NULL) if( (lex = get_lex(querybuf)) == NULL)
{ {
goto retblock; goto retblock;
} }
lex->current_select = lex->all_selects_list; lex->current_select = lex->all_selects_list;
while(lex->current_select)
while(lex->current_select){ {
tbl = lex->current_select->join_list->head(); tbl = lex->current_select->join_list->head();
while(tbl) while(tbl)
{ {
if(strcmp(tbl->db,"skygw_virtual") != 0){ if(strcmp(tbl->db,"skygw_virtual") != 0)
if(i>= currsz){ {
tmp = (char**)realloc(databases,sizeof(char*)*(currsz*2 + 1)); if(i>= currsz)
if(tmp == NULL) goto retblock; {
tmp = (char**)realloc(databases,
sizeof(char*)*(currsz*2 + 1));
if(tmp == NULL)
{
goto retblock;
}
databases = tmp; databases = tmp;
currsz = currsz*2 + 1; currsz = currsz*2 + 1;
} }
databases[i++] = strdup(tbl->db); databases[i++] = strdup(tbl->db);
} }
tbl=tbl->next_local; tbl=tbl->next_local;
} }
lex->current_select = lex->current_select->next_select_in_list();
lex->current_select = lex->current_select->next_select_in_list(); }
}
retblock: retblock:
*size = i; *size = i;
return databases; return databases;
} }

View File

@ -31,7 +31,7 @@ threads=4
# backend_write_timeout=<timeout in seconds> # backend_write_timeout=<timeout in seconds>
# backend_read_timeout=<timeout in seconds> # backend_read_timeout=<timeout in seconds>
# #
## mysql_monitor specific options: ## MySQL monitor-specific options:
# #
# Enable detection of replication slaves lag via replication_heartbeat # Enable detection of replication slaves lag via replication_heartbeat
# table - optional. # table - optional.
@ -43,6 +43,13 @@ threads=4
# #
# detect_stale_master=[1|0] (default 0) # detect_stale_master=[1|0] (default 0)
# #
## Galera monitor-specific options:
#
# If disable_master_failback is not set, recovery of previously failed master
# causes mastership to be switched back to it. Enabling the option prevents it.
#
# disable_master_failback=[0|1] (default 0)
#
## Examples: ## Examples:
[MySQL Monitor] [MySQL Monitor]
@ -65,6 +72,7 @@ servers=server1,server2,server3
user=myuser user=myuser
passwd=mypwd passwd=mypwd
monitor_interval=10000 monitor_interval=10000
#disable_master_failback=
## Filter definition ## Filter definition
# #

View File

@ -75,18 +75,16 @@ struct router_instance;
typedef enum { typedef enum {
TARGET_UNDEFINED = 0x00, TARGET_UNDEFINED = 0x00,
TARGET_MASTER = 0x01, TARGET_MASTER = 0x01,
TARGET_SLAVE = 0x02, TARGET_SLAVE = 0x02,
TARGET_NAMED_SERVER = 0x04, TARGET_NAMED_SERVER = 0x04,
TARGET_ALL = 0x08, TARGET_ALL = 0x08,
TARGET_RLAG_MAX = 0x10 TARGET_RLAG_MAX = 0x10
} route_target_t; } route_target_t;
#define TARGET_IS_MASTER(t) (t & TARGET_MASTER)
#define TARGET_IS_SLAVE(t) (t & TARGET_SLAVE)
#define TARGET_IS_NAMED_SERVER(t) (t & TARGET_NAMED_SERVER) #define TARGET_IS_NAMED_SERVER(t) (t & TARGET_NAMED_SERVER)
#define TARGET_IS_ALL(t) (t & TARGET_ALL) #define TARGET_IS_ALL(t) (t & TARGET_ALL)
#define TARGET_IS_RLAG_MAX(t) (t & TARGET_RLAG_MAX)
typedef struct rses_property_st rses_property_t; typedef struct rses_property_st rses_property_t;
typedef struct router_client_session ROUTER_CLIENT_SES; typedef struct router_client_session ROUTER_CLIENT_SES;
@ -100,24 +98,6 @@ typedef enum rses_property_type_t {
RSES_PROP_TYPE_COUNT=RSES_PROP_TYPE_LAST+1 RSES_PROP_TYPE_COUNT=RSES_PROP_TYPE_LAST+1
} rses_property_type_t; } rses_property_type_t;
/**
* This criteria is used when backends are chosen for a router session's use.
* Backend servers are sorted to ascending order according to the criteria
* and top N are chosen.
*/
typedef enum select_criteria {
UNDEFINED_CRITERIA=0,
LEAST_GLOBAL_CONNECTIONS, /*< all connections established by MaxScale */
LEAST_ROUTER_CONNECTIONS, /*< connections established by this router */
LEAST_BEHIND_MASTER,
LEAST_CURRENT_OPERATIONS,
DEFAULT_CRITERIA=LEAST_CURRENT_OPERATIONS,
LAST_CRITERIA /*< not used except for an index */
} select_criteria_t;
/** default values for rwsplit configuration parameters */ /** default values for rwsplit configuration parameters */
#define CONFIG_MAX_SLAVE_CONN 1 #define CONFIG_MAX_SLAVE_CONN 1
#define CONFIG_MAX_SLAVE_RLAG -1 /*< not used */ #define CONFIG_MAX_SLAVE_RLAG -1 /*< not used */
@ -232,11 +212,9 @@ typedef struct backend_ref_st {
} backend_ref_t; } backend_ref_t;
typedef struct rwsplit_config_st { typedef struct dbshard_config_st {
int rw_max_slave_conn_percent; int rw_max_slave_conn_percent;
int rw_max_slave_conn_count; int rw_max_slave_conn_count;
select_criteria_t rw_slave_select_criteria;
int rw_max_slave_replication_lag;
target_t rw_use_sql_variables_in; target_t rw_use_sql_variables_in;
} rwsplit_config_t; } rwsplit_config_t;

View File

@ -374,8 +374,8 @@ int rc;
rc = listen(listener->fd, SOMAXCONN); rc = listen(listener->fd, SOMAXCONN);
if (rc == 0) { if (rc == 0) {
LOGIF(LD, (skygw_log_write( LOGIF(LM, (skygw_log_write(
LOGFILE_DEBUG, LOGFILE_MESSAGE,
"Listening maxscale connections at %s\n", "Listening maxscale connections at %s\n",
config))); config)));
} else { } else {

View File

@ -577,8 +577,8 @@ int gw_send_authentication_to_backend(
dcb = conn->owner_dcb; dcb = conn->owner_dcb;
final_capabilities = gw_mysql_get_byte4((uint8_t *)&server_capabilities); final_capabilities = gw_mysql_get_byte4((uint8_t *)&server_capabilities);
/** Copy client's flags to backend */ /** Copy client's flags to backend but with the known capabilities mask */
final_capabilities |= conn->client_capabilities; final_capabilities |= (conn->client_capabilities & GW_MYSQL_CAPABILITIES_CLIENT);
/* get charset the client sent and use it for connection auth */ /* get charset the client sent and use it for connection auth */
charset = conn->charset; charset = conn->charset;

File diff suppressed because it is too large Load Diff

View File

@ -3220,8 +3220,6 @@ static bool select_connect_backend_servers(
atomic_add(&backend_ref[i].bref_backend->backend_conn_count, -1); atomic_add(&backend_ref[i].bref_backend->backend_conn_count, -1);
} }
} }
master_connected = false;
slaves_connected = 0;
} }
return_succp: return_succp:

View File

@ -1680,12 +1680,12 @@ static bool file_write_header(
if (wbytes1 != 1 || wbytes2 != 1 || wbytes3 != 1 || wbytes4 != 1) { if (wbytes1 != 1 || wbytes2 != 1 || wbytes3 != 1 || wbytes4 != 1) {
fprintf(stderr, fprintf(stderr,
"* Writing header %s %s %s to %s failed.\n", "\nError : Writing header %s %s %s %s failed.\n",
header_buf1, header_buf1,
header_buf2, header_buf2,
header_buf3, header_buf3,
header_buf4); header_buf4);
perror("Logfile header write.\n"); perror("Logfile header write");
goto return_succp; goto return_succp;
} }
#endif #endif
@ -1757,11 +1757,11 @@ static bool file_write_footer(
if (wbytes1 != 1 || wbytes3 != 1 || wbytes4 != 1) if (wbytes1 != 1 || wbytes3 != 1 || wbytes4 != 1)
{ {
fprintf(stderr, fprintf(stderr,
"* Writing header %s %s to %s failed.\n", "\nError : Writing header %s %s to %s failed.\n",
header_buf1, header_buf1,
header_buf3, header_buf3,
header_buf4); header_buf4);
perror("Logfile header write.\n"); perror("Logfile header write");
goto return_succp; goto return_succp;
} }
#endif #endif
@ -1875,7 +1875,7 @@ skygw_file_t* skygw_file_init(
int eno = errno; int eno = errno;
errno = 0; errno = 0;
fprintf(stderr, fprintf(stderr,
"* Writing header of log file %s failed due %d, %s.\n", "\nError : Writing header of log file %s failed due %d, %s.\n",
file->sf_fname, file->sf_fname,
eno, eno,
strerror(eno)); strerror(eno));