Merge branch '2.1' into develop

This commit is contained in:
Markus Mäkelä
2017-02-20 18:39:41 +02:00
13 changed files with 213 additions and 93 deletions

View File

@ -77,6 +77,7 @@ typedef struct
bool failover; /**< If simple failover is enabled */
int failcount; /**< How many monitoring cycles servers must be
down before failover is initiated */
bool failover_recovery; /**< Allow servers to rejoin the cluster in failover mode */
bool warn_failover; /**< Log a warning when failover happens */
} MYSQL_MONITOR;

View File

@ -127,6 +127,7 @@ MXS_MODULE* MXS_CREATE_MODULE()
{"multimaster", MXS_MODULE_PARAM_BOOL, "false"},
{"failover", MXS_MODULE_PARAM_BOOL, "false"},
{"failcount", MXS_MODULE_PARAM_COUNT, "5"},
{"failover_recovery", MXS_MODULE_PARAM_BOOL, "false"},
{
"script",
MXS_MODULE_PARAM_PATH,
@ -280,6 +281,7 @@ startMonitor(MXS_MONITOR *monitor, const MXS_CONFIG_PARAMETER* params)
handle->multimaster = config_get_bool(params, "multimaster");
handle->failover = config_get_bool(params, "failover");
handle->failcount = config_get_integer(params, "failcount");
handle->failover_recovery = config_get_bool(params, "failover_recovery");
handle->mysql51_replication = config_get_bool(params, "mysql51_replication");
handle->script = config_copy_string(params, "script");
handle->events = config_get_enum(params, "events", mxs_monitor_event_enum_values);
@ -1006,9 +1008,10 @@ void do_failover(MYSQL_MONITOR *handle, MXS_MONITOR_SERVERS *db)
{
if (!SERVER_IS_MASTER(db->server) && handle->warn_failover)
{
MXS_WARNING("Failover initiated, server '%s' is now the master. "
"All other servers are set into maintenance mode.",
db->server->unique_name);
MXS_WARNING("Failover initiated, server '%s' is now the master.%s",
db->server->unique_name,
handle->failover_recovery ?
"" : " All other servers are set into maintenance mode.");
handle->warn_failover = false;
}
@ -1016,7 +1019,7 @@ void do_failover(MYSQL_MONITOR *handle, MXS_MONITOR_SERVERS *db)
monitor_set_pending_status(db, SERVER_MASTER);
monitor_clear_pending_status(db, SERVER_SLAVE);
}
else
else if (!handle->failover_recovery)
{
server_set_status_nolock(db->server, SERVER_MAINT);
monitor_set_pending_status(db, SERVER_MAINT);

View File

@ -991,7 +991,8 @@ extract_message(GWBUF *errpkt)
*
*/
static void
errorReply(MXS_ROUTER *instance, void *router_session, GWBUF *message, DCB *backend_dcb, mxs_error_action_t action,
errorReply(MXS_ROUTER *instance, void *router_session, GWBUF *message, DCB *backend_dcb,
mxs_error_action_t action,
bool *succp)
{
/** We should never end up here */

View File

@ -788,25 +788,25 @@ static bool avro_client_stream_data(AVRO_CLIENT *client)
{
switch (client->format)
{
case AVRO_FORMAT_JSON:
/** Currently only JSON format supports seeking to a GTID */
if (client->requested_gtid &&
seek_to_index_pos(client, client->file_handle) &&
seek_to_gtid(client, client->file_handle))
{
client->requested_gtid = false;
}
case AVRO_FORMAT_JSON:
/** Currently only JSON format supports seeking to a GTID */
if (client->requested_gtid &&
seek_to_index_pos(client, client->file_handle) &&
seek_to_gtid(client, client->file_handle))
{
client->requested_gtid = false;
}
read_more = stream_json(client);
break;
read_more = stream_json(client);
break;
case AVRO_FORMAT_AVRO:
read_more = stream_binary(client);
break;
case AVRO_FORMAT_AVRO:
read_more = stream_binary(client);
break;
default:
MXS_ERROR("Unexpected format: %d", client->format);
break;
default:
MXS_ERROR("Unexpected format: %d", client->format);
break;
}
@ -847,13 +847,15 @@ GWBUF* read_avro_json_schema(const char *avrofile, const char* dir)
if (file)
{
int nread;
while ((nread = fread(buffer, 1, sizeof(buffer), file)) > 0)
while ((nread = fread(buffer, 1, sizeof(buffer) - 1, file)) > 0)
{
while (isspace(buffer[nread - 1]))
{
nread--;
}
buffer[nread++] = '\n';
GWBUF * newbuf = gwbuf_alloc_and_load(nread, buffer);
if (newbuf)

View File

@ -88,18 +88,22 @@ void avro_index_file(AVRO_INSTANCE *router, const char* filename)
snprintf(sql, sizeof(sql), "SELECT position FROM "INDEX_TABLE_NAME
" WHERE filename=\"%s\";", name);
if (sqlite3_exec(router->sqlite_handle, sql, index_query_cb, &pos, &errmsg) != SQLITE_OK)
{
MXS_ERROR("Failed to read last indexed position of file '%s': %s",
name, errmsg);
sqlite3_free(errmsg);
maxavro_file_close(file);
return;
}
else if (pos > 0)
/** Continue from last position */
if (pos > 0 && !maxavro_record_set_pos(file, pos))
{
/** Continue from last position */
maxavro_record_set_pos(file, pos);
maxavro_file_close(file);
return;
}
sqlite3_free(errmsg);
errmsg = NULL;
gtid_pos_t prev_gtid = {0, 0, 0, 0, 0};

View File

@ -352,44 +352,64 @@ bool handle_row_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr)
*/
void set_numeric_field_value(avro_value_t *field, uint8_t type, uint8_t *metadata, uint8_t *value)
{
int64_t i = 0;
switch (type)
{
case TABLE_COL_TYPE_TINY:
i = *value;
avro_value_set_int(field, i);
break;
{
char c = *value;
avro_value_set_int(field, c);
break;
}
case TABLE_COL_TYPE_SHORT:
memcpy(&i, value, 2);
avro_value_set_int(field, i);
break;
{
short s = gw_mysql_get_byte2(value);
avro_value_set_int(field, s);
break;
}
case TABLE_COL_TYPE_INT24:
memcpy(&i, value, 3);
avro_value_set_int(field, i);
break;
{
int x = gw_mysql_get_byte3(value);
if (x & 0x800000)
{
x = -((0xffffff & (~x)) + 1);
}
avro_value_set_int(field, x);
break;
}
case TABLE_COL_TYPE_LONG:
memcpy(&i, value, 4);
avro_value_set_int(field, i);
break;
{
int x = gw_mysql_get_byte4(value);
avro_value_set_int(field, x);
break;
}
case TABLE_COL_TYPE_LONGLONG:
memcpy(&i, value, 8);
avro_value_set_int(field, i);
break;
{
long l = gw_mysql_get_byte8(value);
avro_value_set_long(field, l);
break;
}
case TABLE_COL_TYPE_FLOAT:
memcpy(&i, value, 4);
avro_value_set_float(field, (float)i);
break;
{
float f = 0;
memcpy(&f, value, 4);
avro_value_set_float(field, f);
break;
}
case TABLE_COL_TYPE_DOUBLE:
memcpy(&i, value, 8);
avro_value_set_float(field, (double)i);
break;
{
double d = 0;
memcpy(&d, value, 8);
avro_value_set_double(field, d);
break;
}
default:
break;